From e41202e5fd078bada9a8d1070829a7f125127e1e Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Thu, 27 Feb 2025 17:58:08 +0100 Subject: [PATCH 01/10] Simplified passing s3 instructions for iceberg writing --- .../util/TableParquetWriterOptionsTest.java | 5 +- .../deephaven/iceberg/base/IcebergUtils.java | 12 ++++ .../DataInstructionsProviderLoader.java | 2 + .../iceberg/layout/IcebergBaseLayout.java | 25 +++----- .../iceberg/layout/IcebergFlatLayout.java | 8 ++- .../IcebergKeyValuePartitionedLayout.java | 8 ++- .../iceberg/util/IcebergReadInstructions.java | 3 +- .../iceberg/util/IcebergTableAdapter.java | 32 +++++++++- .../iceberg/util/IcebergTableWriter.java | 61 +++++++++++++++---- .../util/TableParquetWriterOptions.java | 9 ++- .../iceberg/util/TableWriterOptions.java | 5 +- .../iceberg/junit5/SqliteCatalogBase.java | 39 +++++++++++- py/server/deephaven/experimental/iceberg.py | 6 +- 13 files changed, 172 insertions(+), 43 deletions(-) diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/TableParquetWriterOptionsTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/TableParquetWriterOptionsTest.java index 01cd3501878..03de3e1687d 100644 --- a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/TableParquetWriterOptionsTest.java +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/TableParquetWriterOptionsTest.java @@ -5,6 +5,7 @@ import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.TableDefinition; +import io.deephaven.extensions.s3.S3Instructions; import io.deephaven.parquet.table.ParquetInstructions; import org.junit.jupiter.api.Test; @@ -144,8 +145,9 @@ void toParquetInstructionTest() { ColumnDefinition.ofInt("PC2").withPartitioning(), ColumnDefinition.ofLong("I")); final Map fieldIdToName = Map.of(2, "field2", 3, "field3"); + final S3Instructions s3Instructions = S3Instructions.builder().build(); final ParquetInstructions parquetInstructions = writeInstructions.toParquetInstructions( - null, definition, fieldIdToName); + null, definition, fieldIdToName, s3Instructions); assertThat(parquetInstructions.getCompressionCodecName()).isEqualTo("GZIP"); assertThat(parquetInstructions.getMaximumDictionaryKeys()).isEqualTo(100); @@ -156,5 +158,6 @@ void toParquetInstructionTest() { assertThat(parquetInstructions.getFieldId("field3")).hasValue(3); assertThat(parquetInstructions.onWriteCompleted()).isEmpty(); assertThat(parquetInstructions.getTableDefinition()).hasValue(definition); + assertThat(parquetInstructions.getSpecialInstructions()).isEqualTo(s3Instructions); } } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java index 06ad3bbbfe2..e5ea54b163d 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java @@ -3,9 +3,11 @@ // package io.deephaven.iceberg.base; +import io.deephaven.base.FileUtils; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.locations.TableDataException; +import io.deephaven.iceberg.relative.RelativeFileIO; import io.deephaven.iceberg.util.IcebergReadInstructions; import org.apache.iceberg.DataFile; import org.apache.iceberg.ManifestContent; @@ -21,6 +23,7 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.jetbrains.annotations.NotNull; @@ -29,6 +32,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.math.BigDecimal; +import java.net.URI; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; @@ -311,4 +315,12 @@ public static void verifyPartitioningColumns( } } } + + public static String path(String path, FileIO io) { + return io instanceof RelativeFileIO ? ((RelativeFileIO) io).absoluteLocation(path) : path; + } + + public static URI locationUri(Table table) { + return FileUtils.convertToURI(path(table.location(), table.io()), true); + } } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/internal/DataInstructionsProviderLoader.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/internal/DataInstructionsProviderLoader.java index 1198ce03823..41f477a2d33 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/internal/DataInstructionsProviderLoader.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/internal/DataInstructionsProviderLoader.java @@ -4,6 +4,7 @@ package io.deephaven.iceberg.internal; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.util.ArrayList; import java.util.List; @@ -82,6 +83,7 @@ private DataInstructionsProviderLoader( * @param uriScheme The URI scheme * @return A data instructions object for the given URI scheme or null if one cannot be found */ + @Nullable public Object load(@NotNull final String uriScheme) { for (final DataInstructionsProviderPlugin plugin : providers) { final Object pluginInstructions = plugin.createInstructions(uriScheme, properties); diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java index b5feea6f80a..0b8a800d257 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java @@ -10,7 +10,6 @@ import io.deephaven.iceberg.base.IcebergUtils; import io.deephaven.iceberg.location.IcebergTableLocationKey; import io.deephaven.iceberg.location.IcebergTableParquetLocationKey; -import io.deephaven.iceberg.relative.RelativeFileIO; import io.deephaven.iceberg.util.IcebergReadInstructions; import io.deephaven.iceberg.util.IcebergTableAdapter; import io.deephaven.parquet.table.ParquetInstructions; @@ -20,19 +19,18 @@ import org.apache.iceberg.*; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.io.FileIO; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.net.URI; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.UUID; import java.util.function.Consumer; import java.util.stream.Stream; import static io.deephaven.iceberg.base.IcebergUtils.allManifestFiles; +import static io.deephaven.iceberg.base.IcebergUtils.path; public abstract class IcebergBaseLayout implements TableLocationKeyFinder { /** @@ -113,11 +111,15 @@ protected IcebergTableLocationKey locationKey( /** * @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table. * @param instructions The instructions for customizations while reading. + * @param dataInstructionsProvider The provider for special instructions, to be used if special instructions not + * provided in the {@code instructions}. + * @param tableLocationUriScheme The URI scheme for the table location. */ public IcebergBaseLayout( @NotNull final IcebergTableAdapter tableAdapter, @NotNull final IcebergReadInstructions instructions, - @NotNull final DataInstructionsProviderLoader dataInstructionsProvider) { + @NotNull final DataInstructionsProviderLoader dataInstructionsProvider, + @NotNull final String tableLocationUriScheme) { this.tableAdapter = tableAdapter; { UUID uuid; @@ -135,11 +137,11 @@ public IcebergBaseLayout( this.snapshot = tableAdapter.getSnapshot(instructions); this.tableDef = tableAdapter.definition(instructions); - this.uriScheme = locationUri(tableAdapter.icebergTable()).getScheme(); + this.uriScheme = tableLocationUriScheme; // Add the data instructions if provided as part of the IcebergReadInstructions, or else attempt to create // data instructions from the properties collection and URI scheme. final Object specialInstructions = instructions.dataInstructions() - .orElseGet(() -> dataInstructionsProvider.load(uriScheme)); + .orElseGet(() -> dataInstructionsProvider.load(tableLocationUriScheme)); { // Start with user-supplied instructions (if provided). final ParquetInstructions.Builder builder = new ParquetInstructions.Builder(); @@ -158,19 +160,12 @@ public IcebergBaseLayout( } this.parquetInstructions = builder.build(); } - this.channelsProvider = SeekableChannelsProviderLoader.getInstance().load(uriScheme, specialInstructions); + this.channelsProvider = + SeekableChannelsProviderLoader.getInstance().load(tableLocationUriScheme, specialInstructions); } abstract IcebergTableLocationKey keyFromDataFile(ManifestFile manifestFile, DataFile dataFile, URI fileUri); - private static String path(String path, FileIO io) { - return io instanceof RelativeFileIO ? ((RelativeFileIO) io).absoluteLocation(path) : path; - } - - private static URI locationUri(Table table) { - return FileUtils.convertToURI(path(table.location(), table.io()), true); - } - private static URI dataFileUri(Table table, DataFile dataFile) { return FileUtils.convertToURI(path(dataFile.path().toString(), table.io()), false); } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java index 6957de21b87..58c3c61a110 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java @@ -21,12 +21,16 @@ public final class IcebergFlatLayout extends IcebergBaseLayout { /** * @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table. * @param instructions The instructions for customizations while reading. + * @param dataInstructionsProvider The provider for special instructions, to be used if special instructions not + * provided in the {@code instructions}. + * @param tableLocationUriScheme The URI scheme for the table location. */ public IcebergFlatLayout( @NotNull final IcebergTableAdapter tableAdapter, @NotNull final IcebergReadInstructions instructions, - @NotNull final DataInstructionsProviderLoader dataInstructionsProvider) { - super(tableAdapter, instructions, dataInstructionsProvider); + @NotNull final DataInstructionsProviderLoader dataInstructionsProvider, + @NotNull final String tableLocationUriScheme) { + super(tableAdapter, instructions, dataInstructionsProvider, tableLocationUriScheme); } @Override diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java index 9c60799a654..8129087b058 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java @@ -41,13 +41,17 @@ private IdentityPartitioningColData(String name, Class type, int index) { * @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table. * @param partitionSpec The Iceberg {@link PartitionSpec partition spec} for the table. * @param instructions The instructions for customizations while reading. + * @param dataInstructionsProvider The provider for special instructions, to be used if special instructions not + * provided in the {@code instructions}. + * @param tableLocationUriScheme The URI scheme for the table location. */ public IcebergKeyValuePartitionedLayout( @NotNull final IcebergTableAdapter tableAdapter, @NotNull final PartitionSpec partitionSpec, @NotNull final IcebergReadInstructions instructions, - @NotNull final DataInstructionsProviderLoader dataInstructionsProvider) { - super(tableAdapter, instructions, dataInstructionsProvider); + @NotNull final DataInstructionsProviderLoader dataInstructionsProvider, + @NotNull final String tableLocationUriScheme) { + super(tableAdapter, instructions, dataInstructionsProvider, tableLocationUriScheme); // We can assume due to upstream validation that there are no duplicate names (after renaming) that are included // in the output definition, so we can ignore duplicates. diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergReadInstructions.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergReadInstructions.java index 21384521005..3db9c70f073 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergReadInstructions.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergReadInstructions.java @@ -37,7 +37,8 @@ public static Builder builder() { /** * The data instructions to use for reading the Iceberg data files (might be S3Instructions or other cloud - * provider-specific instructions). + * provider-specific instructions). If not provided, data instructions will be derived from the properties of the + * catalog. */ public abstract Optional dataInstructions(); diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java index f2b4dbbd0dc..e63946361e7 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java @@ -43,6 +43,7 @@ import java.util.stream.Collectors; import static io.deephaven.iceberg.base.IcebergUtils.convertToDHType; +import static io.deephaven.iceberg.base.IcebergUtils.locationUri; /** * This class manages an Iceberg {@link org.apache.iceberg.Table table} and provides methods to interact with it. @@ -61,6 +62,12 @@ public class IcebergTableAdapter { private final TableIdentifier tableIdentifier; private final DataInstructionsProviderLoader dataInstructionsProviderLoader; + /** + * The URI scheme from the Table {@link org.apache.iceberg.Table#location() location}. This is computed lazily and + * should be accessed via {@link #getScheme()}. + */ + private volatile String uriScheme; + public IcebergTableAdapter( final Catalog catalog, final TableIdentifier tableIdentifier, @@ -389,11 +396,12 @@ public IcebergTable table(@NotNull final IcebergReadInstructions readInstruction final IcebergBaseLayout keyFinder; if (partitionSpec.isUnpartitioned()) { // Create the flat layout location key finder - keyFinder = new IcebergFlatLayout(this, updatedInstructions, dataInstructionsProviderLoader); + keyFinder = new IcebergFlatLayout(this, updatedInstructions, dataInstructionsProviderLoader, + getScheme()); } else { // Create the partitioning column location key finder keyFinder = new IcebergKeyValuePartitionedLayout(this, partitionSpec, updatedInstructions, - dataInstructionsProviderLoader); + dataInstructionsProviderLoader, getScheme()); } if (updatedInstructions.updateMode().updateType() == IcebergUpdateMode.IcebergUpdateType.STATIC) { @@ -584,6 +592,24 @@ private static TableDefinition fromSchema( * @return A new instance of {@link IcebergTableWriter} configured with the provided options. */ public IcebergTableWriter tableWriter(final TableWriterOptions tableWriterOptions) { - return new IcebergTableWriter(tableWriterOptions, this); + return new IcebergTableWriter(tableWriterOptions, this, dataInstructionsProviderLoader, getScheme()); + } + + /** + * Get the URI scheme from the Table {@link org.apache.iceberg.Table#location() location}. + */ + private String getScheme() { + String localScheme; + if ((localScheme = uriScheme) != null) { + return localScheme; + } + synchronized (this) { + if ((localScheme = uriScheme) != null) { + return localScheme; + } + localScheme = locationUri(table).getScheme(); + uriScheme = localScheme; + return localScheme; + } } } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java index 5121134c14f..35ca0ecaaa7 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java @@ -12,6 +12,7 @@ import io.deephaven.engine.table.Table; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.locations.TableDataException; +import io.deephaven.iceberg.internal.DataInstructionsProviderLoader; import io.deephaven.parquet.table.CompletedParquetWrite; import io.deephaven.parquet.table.ParquetInstructions; import io.deephaven.parquet.table.ParquetTools; @@ -49,6 +50,7 @@ import java.util.Random; import static io.deephaven.iceberg.base.IcebergUtils.convertToIcebergType; +import static io.deephaven.iceberg.base.IcebergUtils.locationUri; import static io.deephaven.iceberg.base.IcebergUtils.verifyPartitioningColumns; import static io.deephaven.iceberg.base.IcebergUtils.verifyRequiredFields; @@ -101,15 +103,37 @@ public class IcebergTableWriter { */ private final OutputFileFactory outputFileFactory; + /** + * The instructions to use for writing the parquet files, populated up-front and used for all writes. + */ + private final ParquetInstructions parquetInstructions; + + /** + * The list of completed writes to parquet files. This list is populated by the callback provided in the parquet + * instructions. This list will be cleared after each write operation to reuse for the next write. + */ + private final List parquetFilesWritten; + /** * Characters to be used for generating random variable names of length {@link #VARIABLE_NAME_LENGTH}. */ private static final String CHARACTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; private static final int VARIABLE_NAME_LENGTH = 6; + /** + * Create a new Iceberg table writer instance. + * + * @param tableWriterOptions The options to configure the behavior of this writer instance. + * @param tableAdapter The Iceberg table adapter corresponding to the Iceberg table to write to. + * @param dataInstructionsProvider The provider for special instructions, to be used if special instructions not + * provided in the {@code tableWriterOptions}. + * @param tableLocationUriScheme The URI scheme for the table location. + */ IcebergTableWriter( final TableWriterOptions tableWriterOptions, - final IcebergTableAdapter tableAdapter) { + final IcebergTableAdapter tableAdapter, + final DataInstructionsProviderLoader dataInstructionsProvider, + final String tableLocationUriScheme) { this.tableWriterOptions = verifyWriterOptions(tableWriterOptions); this.table = tableAdapter.icebergTable(); @@ -131,6 +155,16 @@ public class IcebergTableWriter { outputFileFactory = OutputFileFactory.builderFor(table, 0, 0) .format(FileFormat.PARQUET) .build(); + + final Object specialInstructions = tableWriterOptions.dataInstructions() + .orElseGet(() -> dataInstructionsProvider.load(tableLocationUriScheme)); + + // Build the parquet instructions + parquetFilesWritten = new ArrayList<>(); + final ParquetInstructions.OnWriteCompleted onWriteCompleted = parquetFilesWritten::add; + parquetInstructions = this.tableWriterOptions.toParquetInstructions( + onWriteCompleted, tableDefinition, fieldIdToColumnName, specialInstructions); + } private static TableParquetWriterOptions verifyWriterOptions( @@ -285,7 +319,6 @@ public List writeDataFiles(@NotNull final IcebergWriteInstructions wri final List partitionPaths = writeInstructions.partitionPaths(); verifyPartitionPaths(tableSpec, partitionPaths); final List partitionData; - final List parquetFileInfo; // Start a new query scope to avoid polluting the existing query scope with new parameters added for // partitioning columns try (final SafeCloseable _ignore = @@ -293,9 +326,14 @@ public List writeDataFiles(@NotNull final IcebergWriteInstructions wri final Pair, List> ret = partitionDataFromPaths(tableSpec, partitionPaths); partitionData = ret.getFirst(); final List dhTableUpdateStrings = ret.getSecond(); - parquetFileInfo = writeParquet(partitionData, dhTableUpdateStrings, writeInstructions); + writeParquet(partitionData, dhTableUpdateStrings, writeInstructions); } - return dataFilesFromParquet(parquetFileInfo, partitionData); + final List dataFiles = dataFilesFromParquet(partitionData); + + // Clear the list of parquet files written to reuse the same list for the next write + parquetFilesWritten.clear(); + + return dataFiles; } /** @@ -439,7 +477,7 @@ private static String generateRandomAlphabetString(final int length) { } @NotNull - private List writeParquet( + private void writeParquet( @NotNull final List partitionDataList, @NotNull final List dhTableUpdateStrings, @NotNull final IcebergWriteInstructions writeInstructions) { @@ -455,12 +493,6 @@ private List writeParquet( Require.eqZero(dhTableUpdateStrings.size(), "dhTableUpdateStrings.size()"); } - // Build the parquet instructions - final List parquetFilesWritten = new ArrayList<>(dhTables.size()); - final ParquetInstructions.OnWriteCompleted onWriteCompleted = parquetFilesWritten::add; - final ParquetInstructions parquetInstructions = tableWriterOptions.toParquetInstructions( - onWriteCompleted, tableDefinition, fieldIdToColumnName); - // Write the data to parquet files for (int idx = 0; idx < dhTables.size(); idx++) { Table dhTable = dhTables.get(idx); @@ -478,7 +510,11 @@ private List writeParquet( // TODO (deephaven-core#6343): Set writeDefault() values for required columns that not present in the table ParquetTools.writeTable(dhTable, newDataLocation, parquetInstructions); } - return parquetFilesWritten; + // At this point, the list of parquet files written should be populated by the callback + if (parquetFilesWritten.size() != dhTables.size()) { + throw new IllegalStateException("Expected " + dhTables.size() + " parquet files to be written, found " + + parquetFilesWritten.size()); + } } /** @@ -517,7 +553,6 @@ private void commit( * Generate a list of {@link DataFile} objects from a list of parquet files written. */ private List dataFilesFromParquet( - @NotNull final List parquetFilesWritten, @NotNull final List partitionDataList) { final int numFiles = parquetFilesWritten.size(); final List dataFiles = new ArrayList<>(numFiles); diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableParquetWriterOptions.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableParquetWriterOptions.java index abc2c9cbefd..dacecaa2177 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableParquetWriterOptions.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableParquetWriterOptions.java @@ -8,6 +8,7 @@ import io.deephaven.parquet.table.ParquetInstructions; import org.immutables.value.Value; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; import java.util.Map; @@ -71,14 +72,18 @@ public int targetPageSize() { * @param onWriteCompleted The callback to be invoked after writing the parquet file. * @param tableDefinition The table definition to be populated inside the parquet file's schema * @param fieldIdToName Mapping of field id to field name, to be populated inside the parquet file's schema + * @param specialInstructions Special instructions to be passed to the parquet writer */ ParquetInstructions toParquetInstructions( @NotNull final ParquetInstructions.OnWriteCompleted onWriteCompleted, @NotNull final TableDefinition tableDefinition, - @NotNull final Map fieldIdToName) { + @NotNull final Map fieldIdToName, + @Nullable final Object specialInstructions) { final ParquetInstructions.Builder builder = new ParquetInstructions.Builder(); - dataInstructions().ifPresent(builder::setSpecialInstructions); + if (specialInstructions != null) { + builder.setSpecialInstructions(specialInstructions); + } // Add parquet writing specific instructions. builder.setTableDefinition(tableDefinition); diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableWriterOptions.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableWriterOptions.java index 5f37c400b99..cab53bde2e5 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableWriterOptions.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/TableWriterOptions.java @@ -24,8 +24,9 @@ public abstract class TableWriterOptions { public abstract TableDefinition tableDefinition(); /** - * The data instructions to use for reading/writing the Iceberg data files (might be S3Instructions or other cloud - * provider-specific instructions). + * The data instructions to use for writing the Iceberg data files (might be S3Instructions or other cloud + * provider-specific instructions). If not provided, data instructions will be derived from the properties of the + * catalog. */ public abstract Optional dataInstructions(); diff --git a/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java b/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java index 62c61d7aa5b..35bbbc06ea4 100644 --- a/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java +++ b/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java @@ -35,7 +35,6 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.types.Types; import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; @@ -1037,4 +1036,42 @@ void testAutoRefreshingPartitionedAppend() throws InterruptedException { final Table expected2 = TableTools.merge(expected, part3.update("PC = `cat`")); assertTableEquals(expected2, fromIcebergRefreshing.select()); } + + @Test + void appendTableWithAndWithoutDataInstructionsTest() { + final Table source = TableTools.emptyTable(10) + .update("intCol = (int) 2 * i + 10", + "doubleCol = (double) 2.5 * i + 10"); + final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable"); + final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, source.getDefinition()); + { + // Following will add data instructions to the table writer + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(source.getDefinition()) + .build()); + tableWriter.append(IcebergWriteInstructions.builder() + .addTables(source) + .build()); + } + + Table fromIceberg = tableAdapter.table(); + Table expected = source; + assertTableEquals(expected, fromIceberg); + verifySnapshots(tableIdentifier, List.of("append")); + + { + // Skip adding the data instructions to the table writer, should derive them from the catalog + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(TableParquetWriterOptions.builder() + .tableDefinition(source.getDefinition()) + .build()); + tableWriter.append(IcebergWriteInstructions.builder() + .addTables(source) + .build()); + } + + fromIceberg = tableAdapter.table(); + expected = TableTools.merge(source, source); + assertTableEquals(expected, fromIceberg); + verifySnapshots(tableIdentifier, List.of("append", "append")); + } } diff --git a/py/server/deephaven/experimental/iceberg.py b/py/server/deephaven/experimental/iceberg.py index 3265ab65e2a..9b56c0666c4 100644 --- a/py/server/deephaven/experimental/iceberg.py +++ b/py/server/deephaven/experimental/iceberg.py @@ -107,7 +107,8 @@ def __init__(self, the definition is inferred from the Iceberg schema. Setting a definition guarantees the returned table will have that definition. This is useful for specifying a subset of the Iceberg schema columns. data_instructions (Optional[s3.S3Instructions]): Special instructions for reading data files, useful when - reading files from a non-local file system, like S3. + reading files from a non-local file system, like S3. If omitted, the data instructions will be derived + from the properties of the catalog. column_renames (Optional[Dict[str, str]]): A dictionary of old to new column names that will be renamed in the output table. update_mode (Optional[IcebergUpdateMode]): The update mode for the table. If omitted, the default update @@ -313,6 +314,9 @@ def __init__(self, `None`, which means use 2^20 (1,048,576) target_page_size (Optional[int]): the target Parquet file page size in bytes, if not specified. Defaults to `None`, which means use 2^20 bytes (1 MiB) + data_instructions (Optional[s3.S3Instructions]): Special instructions for writing data files, useful when + writing files to a non-local file system, like S3. If omitted, the data instructions will be derived + from the properties of the catalog. Raises: DHError: If unable to build the object. From 8a78496d3dd8a7a5ee60987f04860028d12a5c53 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 28 Feb 2025 17:14:54 +0100 Subject: [PATCH 02/10] Minor tweaks to unit tests --- .../iceberg/util/IcebergTableWriter.java | 24 +++++++++---------- .../iceberg/junit5/SqliteCatalogBase.java | 9 ++++--- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java index 35ca0ecaaa7..82ea8d92999 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java @@ -50,7 +50,6 @@ import java.util.Random; import static io.deephaven.iceberg.base.IcebergUtils.convertToIcebergType; -import static io.deephaven.iceberg.base.IcebergUtils.locationUri; import static io.deephaven.iceberg.base.IcebergUtils.verifyPartitioningColumns; import static io.deephaven.iceberg.base.IcebergUtils.verifyRequiredFields; @@ -328,12 +327,7 @@ public List writeDataFiles(@NotNull final IcebergWriteInstructions wri final List dhTableUpdateStrings = ret.getSecond(); writeParquet(partitionData, dhTableUpdateStrings, writeInstructions); } - final List dataFiles = dataFilesFromParquet(partitionData); - - // Clear the list of parquet files written to reuse the same list for the next write - parquetFilesWritten.clear(); - - return dataFiles; + return dataFilesFromParquet(partitionData); } /** @@ -476,7 +470,6 @@ private static String generateRandomAlphabetString(final int length) { return stringBuilder.toString(); } - @NotNull private void writeParquet( @NotNull final List partitionDataList, @NotNull final List dhTableUpdateStrings, @@ -493,6 +486,12 @@ private void writeParquet( Require.eqZero(dhTableUpdateStrings.size(), "dhTableUpdateStrings.size()"); } + // At this point, the list of parquet files written should be empty and will be populated by the callback + if (!parquetFilesWritten.isEmpty()) { + throw new IllegalStateException("List of parquet files written should be empty before writing new data, " + + "found " + parquetFilesWritten.size() + " files"); + } + // Write the data to parquet files for (int idx = 0; idx < dhTables.size(); idx++) { Table dhTable = dhTables.get(idx); @@ -510,11 +509,6 @@ private void writeParquet( // TODO (deephaven-core#6343): Set writeDefault() values for required columns that not present in the table ParquetTools.writeTable(dhTable, newDataLocation, parquetInstructions); } - // At this point, the list of parquet files written should be populated by the callback - if (parquetFilesWritten.size() != dhTables.size()) { - throw new IllegalStateException("Expected " + dhTables.size() + " parquet files to be written, found " + - parquetFilesWritten.size()); - } } /** @@ -569,6 +563,10 @@ private List dataFilesFromParquet( } dataFiles.add(dataFileBuilder.build()); } + + // Clear the list of parquet files written to reuse the same list for the next write + parquetFilesWritten.clear(); + return dataFiles; } } diff --git a/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java b/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java index 35bbbc06ea4..c23ee164c18 100644 --- a/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java +++ b/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java @@ -55,6 +55,8 @@ import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; import static io.deephaven.engine.util.TableTools.col; import static io.deephaven.engine.util.TableTools.doubleCol; +import static io.deephaven.engine.util.TableTools.intCol; +import static io.deephaven.engine.util.TableTools.longCol; import static org.apache.parquet.schema.LogicalTypeAnnotation.intType; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; @@ -1039,9 +1041,10 @@ void testAutoRefreshingPartitionedAppend() throws InterruptedException { @Test void appendTableWithAndWithoutDataInstructionsTest() { - final Table source = TableTools.emptyTable(10) - .update("intCol = (int) 2 * i + 10", - "doubleCol = (double) 2.5 * i + 10"); + final Table source = TableTools.newTable( + intCol("intCol", 15, 0, 32, 33, 19), + doubleCol("doubleCol", 10.5, 2.5, 3.5, 40.5, 0.5), + longCol("longCol", 20L, 50L, 0L, 10L, 5L)); final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable"); final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, source.getDefinition()); { From 397ea56be33308b8c7571b8025d30543dc2a253c Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Mon, 3 Mar 2025 20:47:10 +0100 Subject: [PATCH 03/10] Review with Devin --- .../util/TableParquetWriterOptionsTest.java | 10 +++++-- .../deephaven/iceberg/base/IcebergUtils.java | 4 +-- .../iceberg/layout/IcebergBaseLayout.java | 2 +- .../iceberg/util/IcebergTableAdapter.java | 30 ++++--------------- .../iceberg/util/IcebergTableWriter.java | 23 +++++++------- 5 files changed, 28 insertions(+), 41 deletions(-) diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/TableParquetWriterOptionsTest.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/TableParquetWriterOptionsTest.java index 03de3e1687d..fc9f236a1fb 100644 --- a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/TableParquetWriterOptionsTest.java +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/TableParquetWriterOptionsTest.java @@ -6,6 +6,7 @@ import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.TableDefinition; import io.deephaven.extensions.s3.S3Instructions; +import io.deephaven.parquet.table.CompletedParquetWrite; import io.deephaven.parquet.table.ParquetInstructions; import org.junit.jupiter.api.Test; @@ -145,9 +146,12 @@ void toParquetInstructionTest() { ColumnDefinition.ofInt("PC2").withPartitioning(), ColumnDefinition.ofLong("I")); final Map fieldIdToName = Map.of(2, "field2", 3, "field3"); - final S3Instructions s3Instructions = S3Instructions.builder().build(); + final S3Instructions s3Instructions = S3Instructions.builder().regionName("test-region").build(); + final ParquetInstructions.OnWriteCompleted onWriteCompleted = + (final CompletedParquetWrite completedParquetWrite) -> { + /* Do nothing */ }; final ParquetInstructions parquetInstructions = writeInstructions.toParquetInstructions( - null, definition, fieldIdToName, s3Instructions); + onWriteCompleted, definition, fieldIdToName, s3Instructions); assertThat(parquetInstructions.getCompressionCodecName()).isEqualTo("GZIP"); assertThat(parquetInstructions.getMaximumDictionaryKeys()).isEqualTo(100); @@ -156,7 +160,7 @@ void toParquetInstructionTest() { assertThat(parquetInstructions.getFieldId("field1")).isEmpty(); assertThat(parquetInstructions.getFieldId("field2")).hasValue(2); assertThat(parquetInstructions.getFieldId("field3")).hasValue(3); - assertThat(parquetInstructions.onWriteCompleted()).isEmpty(); + assertThat(parquetInstructions.onWriteCompleted()).hasValue(onWriteCompleted); assertThat(parquetInstructions.getTableDefinition()).hasValue(definition); assertThat(parquetInstructions.getSpecialInstructions()).isEqualTo(s3Instructions); } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java index e5ea54b163d..7dbbebf25a4 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java @@ -316,11 +316,11 @@ public static void verifyPartitioningColumns( } } - public static String path(String path, FileIO io) { + public static String path(@NotNull final String path, @NotNull final FileIO io) { return io instanceof RelativeFileIO ? ((RelativeFileIO) io).absoluteLocation(path) : path; } - public static URI locationUri(Table table) { + public static URI locationUri(@NotNull final Table table) { return FileUtils.convertToURI(path(table.location(), table.io()), true); } } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java index 0b8a800d257..1dcabd6c0f3 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java @@ -166,7 +166,7 @@ public IcebergBaseLayout( abstract IcebergTableLocationKey keyFromDataFile(ManifestFile manifestFile, DataFile dataFile, URI fileUri); - private static URI dataFileUri(Table table, DataFile dataFile) { + private static URI dataFileUri(@NotNull final Table table, @NotNull final DataFile dataFile) { return FileUtils.convertToURI(path(dataFile.path().toString(), table.io()), false); } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java index e63946361e7..5802d740207 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java @@ -63,10 +63,9 @@ public class IcebergTableAdapter { private final DataInstructionsProviderLoader dataInstructionsProviderLoader; /** - * The URI scheme from the Table {@link org.apache.iceberg.Table#location() location}. This is computed lazily and - * should be accessed via {@link #getScheme()}. + * The URI scheme for the table location. */ - private volatile String uriScheme; + private final String uriScheme; public IcebergTableAdapter( final Catalog catalog, @@ -77,6 +76,7 @@ public IcebergTableAdapter( this.table = table; this.tableIdentifier = tableIdentifier; this.dataInstructionsProviderLoader = dataInstructionsProviderLoader; + this.uriScheme = locationUri(table).getScheme(); } /** @@ -397,11 +397,11 @@ public IcebergTable table(@NotNull final IcebergReadInstructions readInstruction if (partitionSpec.isUnpartitioned()) { // Create the flat layout location key finder keyFinder = new IcebergFlatLayout(this, updatedInstructions, dataInstructionsProviderLoader, - getScheme()); + uriScheme); } else { // Create the partitioning column location key finder keyFinder = new IcebergKeyValuePartitionedLayout(this, partitionSpec, updatedInstructions, - dataInstructionsProviderLoader, getScheme()); + dataInstructionsProviderLoader, uriScheme); } if (updatedInstructions.updateMode().updateType() == IcebergUpdateMode.IcebergUpdateType.STATIC) { @@ -592,24 +592,6 @@ private static TableDefinition fromSchema( * @return A new instance of {@link IcebergTableWriter} configured with the provided options. */ public IcebergTableWriter tableWriter(final TableWriterOptions tableWriterOptions) { - return new IcebergTableWriter(tableWriterOptions, this, dataInstructionsProviderLoader, getScheme()); - } - - /** - * Get the URI scheme from the Table {@link org.apache.iceberg.Table#location() location}. - */ - private String getScheme() { - String localScheme; - if ((localScheme = uriScheme) != null) { - return localScheme; - } - synchronized (this) { - if ((localScheme = uriScheme) != null) { - return localScheme; - } - localScheme = locationUri(table).getScheme(); - uriScheme = localScheme; - return localScheme; - } + return new IcebergTableWriter(tableWriterOptions, this, dataInstructionsProviderLoader, uriScheme); } } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java index 82ea8d92999..c482e9629e6 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java @@ -317,6 +317,12 @@ public List writeDataFiles(@NotNull final IcebergWriteInstructions wri verifyCompatible(writeInstructions.tables(), nonPartitioningTableDefinition); final List partitionPaths = writeInstructions.partitionPaths(); verifyPartitionPaths(tableSpec, partitionPaths); + + if (!parquetFilesWritten.isEmpty()) { + throw new IllegalStateException("List of parquet files written should be empty before writing new data, " + + "found " + parquetFilesWritten.size() + " files"); + } + final List partitionData; // Start a new query scope to avoid polluting the existing query scope with new parameters added for // partitioning columns @@ -327,7 +333,12 @@ public List writeDataFiles(@NotNull final IcebergWriteInstructions wri final List dhTableUpdateStrings = ret.getSecond(); writeParquet(partitionData, dhTableUpdateStrings, writeInstructions); } - return dataFilesFromParquet(partitionData); + final List dataFiles = dataFilesFromParquet(partitionData); + + // Clear the list to reuse it for the next write + parquetFilesWritten.clear(); + + return dataFiles; } /** @@ -486,12 +497,6 @@ private void writeParquet( Require.eqZero(dhTableUpdateStrings.size(), "dhTableUpdateStrings.size()"); } - // At this point, the list of parquet files written should be empty and will be populated by the callback - if (!parquetFilesWritten.isEmpty()) { - throw new IllegalStateException("List of parquet files written should be empty before writing new data, " + - "found " + parquetFilesWritten.size() + " files"); - } - // Write the data to parquet files for (int idx = 0; idx < dhTables.size(); idx++) { Table dhTable = dhTables.get(idx); @@ -563,10 +568,6 @@ private List dataFilesFromParquet( } dataFiles.add(dataFileBuilder.build()); } - - // Clear the list of parquet files written to reuse the same list for the next write - parquetFilesWritten.clear(); - return dataFiles; } } From 61d4cbb716a2d1f7aa50cf3bf52aaa10280e413e Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Mon, 3 Mar 2025 21:24:14 +0100 Subject: [PATCH 04/10] Make the change non-breaking --- .../iceberg/layout/IcebergBaseLayout.java | 8 +++----- .../iceberg/layout/IcebergFlatLayout.java | 6 ++---- .../layout/IcebergKeyValuePartitionedLayout.java | 5 ++--- .../iceberg/util/IcebergTableAdapter.java | 14 ++++++++++---- .../deephaven/iceberg/util/IcebergTableWriter.java | 7 +++---- 5 files changed, 20 insertions(+), 20 deletions(-) diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java index fa173d30995..bf8b0bb6180 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java @@ -124,13 +124,11 @@ protected IcebergTableLocationKey locationKey( * @param instructions The instructions for customizations while reading. * @param dataInstructionsProvider The provider for special instructions, to be used if special instructions not * provided in the {@code instructions}. - * @param tableLocationUriScheme The URI scheme for the table location. */ public IcebergBaseLayout( @NotNull final IcebergTableAdapter tableAdapter, @NotNull final IcebergReadInstructions instructions, - @NotNull final DataInstructionsProviderLoader dataInstructionsProvider, - @NotNull final String tableLocationUriScheme) { + @NotNull final DataInstructionsProviderLoader dataInstructionsProvider) { this.tableAdapter = tableAdapter; this.instructions = instructions; this.dataInstructionsProvider = dataInstructionsProvider; @@ -150,11 +148,11 @@ public IcebergBaseLayout( this.snapshot = tableAdapter.getSnapshot(instructions); this.tableDef = tableAdapter.definition(instructions); - this.uriScheme = tableLocationUriScheme; + this.uriScheme = tableAdapter.getScheme(); // Add the data instructions if provided as part of the IcebergReadInstructions, or else attempt to create // data instructions from the properties collection and URI scheme. final Object specialInstructions = instructions.dataInstructions() - .orElseGet(() -> dataInstructionsProvider.load(tableLocationUriScheme)); + .orElseGet(() -> dataInstructionsProvider.load(uriScheme)); { // Start with user-supplied instructions (if provided). final ParquetInstructions.Builder builder = new ParquetInstructions.Builder(); diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java index 8845629bef9..6b78e4473a5 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java @@ -24,14 +24,12 @@ public final class IcebergFlatLayout extends IcebergBaseLayout { * @param instructions The instructions for customizations while reading. * @param dataInstructionsProvider The provider for special instructions, to be used if special instructions not * provided in the {@code instructions}. - * @param tableLocationUriScheme The URI scheme for the table location. */ public IcebergFlatLayout( @NotNull final IcebergTableAdapter tableAdapter, @NotNull final IcebergReadInstructions instructions, - @NotNull final DataInstructionsProviderLoader dataInstructionsProvider, - @NotNull final String tableLocationUriScheme) { - super(tableAdapter, instructions, dataInstructionsProvider, tableLocationUriScheme); + @NotNull final DataInstructionsProviderLoader dataInstructionsProvider) { + super(tableAdapter, instructions, dataInstructionsProvider); } @Override diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java index ae533d068b9..a53b39a6660 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java @@ -50,9 +50,8 @@ public IcebergKeyValuePartitionedLayout( @NotNull final IcebergTableAdapter tableAdapter, @NotNull final PartitionSpec partitionSpec, @NotNull final IcebergReadInstructions instructions, - @NotNull final DataInstructionsProviderLoader dataInstructionsProvider, - @NotNull final String tableLocationUriScheme) { - super(tableAdapter, instructions, dataInstructionsProvider, tableLocationUriScheme); + @NotNull final DataInstructionsProviderLoader dataInstructionsProvider) { + super(tableAdapter, instructions, dataInstructionsProvider); // We can assume due to upstream validation that there are no duplicate names (after renaming) that are included // in the output definition, so we can ignore duplicates. diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java index 5802d740207..044d8043cdb 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableAdapter.java @@ -396,12 +396,11 @@ public IcebergTable table(@NotNull final IcebergReadInstructions readInstruction final IcebergBaseLayout keyFinder; if (partitionSpec.isUnpartitioned()) { // Create the flat layout location key finder - keyFinder = new IcebergFlatLayout(this, updatedInstructions, dataInstructionsProviderLoader, - uriScheme); + keyFinder = new IcebergFlatLayout(this, updatedInstructions, dataInstructionsProviderLoader); } else { // Create the partitioning column location key finder keyFinder = new IcebergKeyValuePartitionedLayout(this, partitionSpec, updatedInstructions, - dataInstructionsProviderLoader, uriScheme); + dataInstructionsProviderLoader); } if (updatedInstructions.updateMode().updateType() == IcebergUpdateMode.IcebergUpdateType.STATIC) { @@ -592,6 +591,13 @@ private static TableDefinition fromSchema( * @return A new instance of {@link IcebergTableWriter} configured with the provided options. */ public IcebergTableWriter tableWriter(final TableWriterOptions tableWriterOptions) { - return new IcebergTableWriter(tableWriterOptions, this, dataInstructionsProviderLoader, uriScheme); + return new IcebergTableWriter(tableWriterOptions, this, dataInstructionsProviderLoader); + } + + /** + * Get the URI scheme for the table location. + */ + public String getScheme() { + return uriScheme; } } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java index c482e9629e6..6bc1346edb1 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java @@ -126,13 +126,11 @@ public class IcebergTableWriter { * @param tableAdapter The Iceberg table adapter corresponding to the Iceberg table to write to. * @param dataInstructionsProvider The provider for special instructions, to be used if special instructions not * provided in the {@code tableWriterOptions}. - * @param tableLocationUriScheme The URI scheme for the table location. */ IcebergTableWriter( final TableWriterOptions tableWriterOptions, final IcebergTableAdapter tableAdapter, - final DataInstructionsProviderLoader dataInstructionsProvider, - final String tableLocationUriScheme) { + final DataInstructionsProviderLoader dataInstructionsProvider) { this.tableWriterOptions = verifyWriterOptions(tableWriterOptions); this.table = tableAdapter.icebergTable(); @@ -155,8 +153,9 @@ public class IcebergTableWriter { .format(FileFormat.PARQUET) .build(); + final String uriScheme = tableAdapter.getScheme(); final Object specialInstructions = tableWriterOptions.dataInstructions() - .orElseGet(() -> dataInstructionsProvider.load(tableLocationUriScheme)); + .orElseGet(() -> dataInstructionsProvider.load(uriScheme)); // Build the parquet instructions parquetFilesWritten = new ArrayList<>(); From 0ffcee2bb7029f6838ccc44d1e1c067f3d03d054 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Mon, 3 Mar 2025 21:26:20 +0100 Subject: [PATCH 05/10] Minor changes in javadoc --- .../iceberg/layout/IcebergKeyValuePartitionedLayout.java | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java index a53b39a6660..5123b26422b 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java @@ -44,7 +44,6 @@ private IdentityPartitioningColData(String name, Class type, int index) { * @param instructions The instructions for customizations while reading. * @param dataInstructionsProvider The provider for special instructions, to be used if special instructions not * provided in the {@code instructions}. - * @param tableLocationUriScheme The URI scheme for the table location. */ public IcebergKeyValuePartitionedLayout( @NotNull final IcebergTableAdapter tableAdapter, From 229c172ca0b60b5d4434a44ec85de23415f56ceb Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 7 Mar 2025 18:28:35 +0100 Subject: [PATCH 06/10] Review with Devin contd. --- .../iceberg/layout/IcebergBaseLayout.java | 8 ++------ .../iceberg/util/IcebergTableAdapter.java | 16 +++++++--------- .../iceberg/util/IcebergTableWriter.java | 2 +- 3 files changed, 10 insertions(+), 16 deletions(-) diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java index bf8b0bb6180..e232a7ad741 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java @@ -70,11 +70,6 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder dataInstructionsProvider.load(uriScheme)); From 8aec144729a39ad06de8e422ec5305aef19fcc0d Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 7 Mar 2025 18:55:12 +0100 Subject: [PATCH 07/10] Reverting some state management related changes --- .../iceberg/util/IcebergTableWriter.java | 43 +++++++------------ 1 file changed, 15 insertions(+), 28 deletions(-) diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java index ab5db0636e6..da601a709dd 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java @@ -103,15 +103,10 @@ public class IcebergTableWriter { private final OutputFileFactory outputFileFactory; /** - * The instructions to use for writing the parquet files, populated up-front and used for all writes. + * The special instructions to use for writing the Iceberg data files (might be S3Instructions or other cloud + * provider-specific instructions). */ - private final ParquetInstructions parquetInstructions; - - /** - * The list of completed writes to parquet files. This list is populated by the callback provided in the parquet - * instructions. This list will be cleared after each write operation to reuse for the next write. - */ - private final List parquetFilesWritten; + private final Object specialInstructions; /** * Characters to be used for generating random variable names of length {@link #VARIABLE_NAME_LENGTH}. @@ -154,15 +149,9 @@ public class IcebergTableWriter { .build(); final String uriScheme = tableAdapter.locationUri().getScheme(); - final Object specialInstructions = tableWriterOptions.dataInstructions() + this.specialInstructions = tableWriterOptions.dataInstructions() .orElseGet(() -> dataInstructionsProvider.load(uriScheme)); - // Build the parquet instructions - parquetFilesWritten = new ArrayList<>(); - final ParquetInstructions.OnWriteCompleted onWriteCompleted = parquetFilesWritten::add; - parquetInstructions = this.tableWriterOptions.toParquetInstructions( - onWriteCompleted, tableDefinition, fieldIdToColumnName, specialInstructions); - } private static TableParquetWriterOptions verifyWriterOptions( @@ -317,12 +306,8 @@ public List writeDataFiles(@NotNull final IcebergWriteInstructions wri final List partitionPaths = writeInstructions.partitionPaths(); verifyPartitionPaths(tableSpec, partitionPaths); - if (!parquetFilesWritten.isEmpty()) { - throw new IllegalStateException("List of parquet files written should be empty before writing new data, " + - "found " + parquetFilesWritten.size() + " files"); - } - final List partitionData; + final List parquetFileInfo; // Start a new query scope to avoid polluting the existing query scope with new parameters added for // partitioning columns try (final SafeCloseable _ignore = @@ -330,14 +315,9 @@ public List writeDataFiles(@NotNull final IcebergWriteInstructions wri final Pair, List> ret = partitionDataFromPaths(tableSpec, partitionPaths); partitionData = ret.getFirst(); final List dhTableUpdateStrings = ret.getSecond(); - writeParquet(partitionData, dhTableUpdateStrings, writeInstructions); + parquetFileInfo = writeParquet(partitionData, dhTableUpdateStrings, writeInstructions); } - final List dataFiles = dataFilesFromParquet(partitionData); - - // Clear the list to reuse it for the next write - parquetFilesWritten.clear(); - - return dataFiles; + return dataFilesFromParquet(parquetFileInfo, partitionData); } /** @@ -480,7 +460,7 @@ private static String generateRandomAlphabetString(final int length) { return stringBuilder.toString(); } - private void writeParquet( + private List writeParquet( @NotNull final List partitionDataList, @NotNull final List dhTableUpdateStrings, @NotNull final IcebergWriteInstructions writeInstructions) { @@ -496,6 +476,11 @@ private void writeParquet( Require.eqZero(dhTableUpdateStrings.size(), "dhTableUpdateStrings.size()"); } + final List parquetFilesWritten = new ArrayList<>(); + final ParquetInstructions.OnWriteCompleted onWriteCompleted = parquetFilesWritten::add; + final ParquetInstructions parquetInstructions = tableWriterOptions.toParquetInstructions( + onWriteCompleted, tableDefinition, fieldIdToColumnName, specialInstructions); + // Write the data to parquet files for (int idx = 0; idx < dhTables.size(); idx++) { Table dhTable = dhTables.get(idx); @@ -513,6 +498,7 @@ private void writeParquet( // TODO (deephaven-core#6343): Set writeDefault() values for required columns that not present in the table ParquetTools.writeTable(dhTable, newDataLocation, parquetInstructions); } + return parquetFilesWritten; } /** @@ -551,6 +537,7 @@ private void commit( * Generate a list of {@link DataFile} objects from a list of parquet files written. */ private List dataFilesFromParquet( + @NotNull final List parquetFilesWritten, @NotNull final List partitionDataList) { final int numFiles = parquetFilesWritten.size(); final List dataFiles = new ArrayList<>(numFiles); From 54cbd1ae33e7d2faca1f065bfc8cd7022046afe5 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 7 Mar 2025 18:56:56 +0100 Subject: [PATCH 08/10] Minor tweaks --- .../java/io/deephaven/iceberg/util/IcebergTableWriter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java index da601a709dd..ee8fbeebc09 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java @@ -305,7 +305,6 @@ public List writeDataFiles(@NotNull final IcebergWriteInstructions wri verifyCompatible(writeInstructions.tables(), nonPartitioningTableDefinition); final List partitionPaths = writeInstructions.partitionPaths(); verifyPartitionPaths(tableSpec, partitionPaths); - final List partitionData; final List parquetFileInfo; // Start a new query scope to avoid polluting the existing query scope with new parameters added for @@ -460,6 +459,7 @@ private static String generateRandomAlphabetString(final int length) { return stringBuilder.toString(); } + @NotNull private List writeParquet( @NotNull final List partitionDataList, @NotNull final List dhTableUpdateStrings, @@ -476,7 +476,7 @@ private List writeParquet( Require.eqZero(dhTableUpdateStrings.size(), "dhTableUpdateStrings.size()"); } - final List parquetFilesWritten = new ArrayList<>(); + final List parquetFilesWritten = new ArrayList<>(dhTables.size()); final ParquetInstructions.OnWriteCompleted onWriteCompleted = parquetFilesWritten::add; final ParquetInstructions parquetInstructions = tableWriterOptions.toParquetInstructions( onWriteCompleted, tableDefinition, fieldIdToColumnName, specialInstructions); From 76e61a0265d92f92b0b9eee9cfd071293c7d74e2 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 7 Mar 2025 18:58:58 +0100 Subject: [PATCH 09/10] Some more touchups --- .../main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java | 1 + .../main/java/io/deephaven/iceberg/util/IcebergTableWriter.java | 1 + 2 files changed, 2 insertions(+) diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java index e232a7ad741..8b5909c8617 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java @@ -167,6 +167,7 @@ public IcebergBaseLayout( } this.parquetInstructions = builder.build(); } + uriSchemeTochannelsProviders = new HashMap<>(); uriSchemeTochannelsProviders.put(uriScheme, SeekableChannelsProviderLoader.getInstance().load(uriScheme, specialInstructions)); diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java index ee8fbeebc09..2e50dbc6a2a 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/util/IcebergTableWriter.java @@ -476,6 +476,7 @@ private List writeParquet( Require.eqZero(dhTableUpdateStrings.size(), "dhTableUpdateStrings.size()"); } + // Build the parquet instructions final List parquetFilesWritten = new ArrayList<>(dhTables.size()); final ParquetInstructions.OnWriteCompleted onWriteCompleted = parquetFilesWritten::add; final ParquetInstructions parquetInstructions = tableWriterOptions.toParquetInstructions( From a09b8ef176cb81a6cb673436fee3461338ae8ec2 Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Fri, 7 Mar 2025 20:29:25 +0100 Subject: [PATCH 10/10] Minor tweaks to Python --- py/server/deephaven/experimental/iceberg.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/py/server/deephaven/experimental/iceberg.py b/py/server/deephaven/experimental/iceberg.py index 9b56c0666c4..e914d866ea6 100644 --- a/py/server/deephaven/experimental/iceberg.py +++ b/py/server/deephaven/experimental/iceberg.py @@ -108,7 +108,7 @@ def __init__(self, will have that definition. This is useful for specifying a subset of the Iceberg schema columns. data_instructions (Optional[s3.S3Instructions]): Special instructions for reading data files, useful when reading files from a non-local file system, like S3. If omitted, the data instructions will be derived - from the properties of the catalog. + from the catalog. column_renames (Optional[Dict[str, str]]): A dictionary of old to new column names that will be renamed in the output table. update_mode (Optional[IcebergUpdateMode]): The update mode for the table. If omitted, the default update @@ -316,7 +316,7 @@ def __init__(self, `None`, which means use 2^20 bytes (1 MiB) data_instructions (Optional[s3.S3Instructions]): Special instructions for writing data files, useful when writing files to a non-local file system, like S3. If omitted, the data instructions will be derived - from the properties of the catalog. + from the catalog. Raises: DHError: If unable to build the object.