Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DH-18149: Simplified passing s3 instructions for iceberg writing #6668

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.extensions.s3.S3Instructions;
import io.deephaven.parquet.table.CompletedParquetWrite;
import io.deephaven.parquet.table.ParquetInstructions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -144,8 +146,12 @@ void toParquetInstructionTest() {
ColumnDefinition.ofInt("PC2").withPartitioning(),
ColumnDefinition.ofLong("I"));
final Map<Integer, String> fieldIdToName = Map.of(2, "field2", 3, "field3");
final S3Instructions s3Instructions = S3Instructions.builder().regionName("test-region").build();
final ParquetInstructions.OnWriteCompleted onWriteCompleted =
(final CompletedParquetWrite completedParquetWrite) -> {
/* Do nothing */ };
final ParquetInstructions parquetInstructions = writeInstructions.toParquetInstructions(
null, definition, fieldIdToName);
onWriteCompleted, definition, fieldIdToName, s3Instructions);

assertThat(parquetInstructions.getCompressionCodecName()).isEqualTo("GZIP");
assertThat(parquetInstructions.getMaximumDictionaryKeys()).isEqualTo(100);
Expand All @@ -154,7 +160,8 @@ void toParquetInstructionTest() {
assertThat(parquetInstructions.getFieldId("field1")).isEmpty();
assertThat(parquetInstructions.getFieldId("field2")).hasValue(2);
assertThat(parquetInstructions.getFieldId("field3")).hasValue(3);
assertThat(parquetInstructions.onWriteCompleted()).isEmpty();
assertThat(parquetInstructions.onWriteCompleted()).hasValue(onWriteCompleted);
assertThat(parquetInstructions.getTableDefinition()).hasValue(definition);
assertThat(parquetInstructions.getSpecialInstructions()).isEqualTo(s3Instructions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,15 @@ public static <T> Stream<T> toStream(final org.apache.iceberg.io.CloseableIterab
});
}

private static String path(String path, FileIO io) {
private static String path(@NotNull final String path, @NotNull final FileIO io) {
return io instanceof RelativeFileIO ? ((RelativeFileIO) io).absoluteLocation(path) : path;
}

public static URI locationUri(Table table) {
public static URI locationUri(@NotNull final Table table) {
return FileUtils.convertToURI(path(table.location(), table.io()), true);
}

public static URI dataFileUri(Table table, DataFile dataFile) {
public static URI dataFileUri(@NotNull final Table table, @NotNull final DataFile dataFile) {
return FileUtils.convertToURI(path(dataFile.path().toString(), table.io()), false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.iceberg.internal;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -82,6 +83,7 @@ private DataInstructionsProviderLoader(
* @param uriScheme The URI scheme
* @return A data instructions object for the given URI scheme or null if one cannot be found
*/
@Nullable
public Object load(@NotNull final String uriScheme) {
for (final DataInstructionsProviderPlugin plugin : providers) {
final Object pluginInstructions = plugin.createInstructions(uriScheme, properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import static io.deephaven.iceberg.base.IcebergUtils.allManifestFiles;
import static io.deephaven.iceberg.base.IcebergUtils.dataFileUri;
import static io.deephaven.iceberg.base.IcebergUtils.locationUri;

public abstract class IcebergBaseLayout implements TableLocationKeyFinder<IcebergTableLocationKey> {
/**
Expand Down Expand Up @@ -71,11 +70,6 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder<Iceber
*/
final TableDefinition tableDef;

/**
* The URI scheme from the Table {@link Table#location() location}.
*/
private final String uriScheme;

/**
* The {@link Snapshot} from which to discover data files.
*/
Expand Down Expand Up @@ -123,6 +117,8 @@ protected IcebergTableLocationKey locationKey(
/**
* @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table.
* @param instructions The instructions for customizations while reading.
* @param dataInstructionsProvider The provider for special instructions, to be used if special instructions not
* provided in the {@code instructions}.
*/
public IcebergBaseLayout(
@NotNull final IcebergTableAdapter tableAdapter,
Expand All @@ -147,7 +143,8 @@ public IcebergBaseLayout(

this.snapshot = tableAdapter.getSnapshot(instructions);
this.tableDef = tableAdapter.definition(instructions);
this.uriScheme = locationUri(tableAdapter.icebergTable()).getScheme();

final String uriScheme = tableAdapter.locationUri().getScheme();
// Add the data instructions if provided as part of the IcebergReadInstructions, or else attempt to create
// data instructions from the properties collection and URI scheme.
final Object specialInstructions = instructions.dataInstructions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public final class IcebergFlatLayout extends IcebergBaseLayout {
/**
* @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table.
* @param instructions The instructions for customizations while reading.
* @param dataInstructionsProvider The provider for special instructions, to be used if special instructions not
* provided in the {@code instructions}.
*/
public IcebergFlatLayout(
@NotNull final IcebergTableAdapter tableAdapter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ private IdentityPartitioningColData(String name, Class<?> type, int index) {
* @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table.
* @param partitionSpec The Iceberg {@link PartitionSpec partition spec} for the table.
* @param instructions The instructions for customizations while reading.
* @param dataInstructionsProvider The provider for special instructions, to be used if special instructions not
* provided in the {@code instructions}.
*/
public IcebergKeyValuePartitionedLayout(
@NotNull final IcebergTableAdapter tableAdapter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public static Builder builder() {

/**
* The data instructions to use for reading the Iceberg data files (might be S3Instructions or other cloud
* provider-specific instructions).
* provider-specific instructions). If not provided, data instructions will be derived from the properties of the
* catalog.
*/
public abstract Optional<Object> dataInstructions();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.deephaven.engine.table.impl.sources.regioned.RegionedTableComponentFactoryImpl;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.engine.util.TableTools;
import io.deephaven.iceberg.base.IcebergUtils;
import io.deephaven.iceberg.base.IcebergUtils.SpecAndSchema;
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
import io.deephaven.iceberg.layout.*;
Expand All @@ -38,6 +39,7 @@
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.net.URI;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;
Expand All @@ -61,6 +63,8 @@ public class IcebergTableAdapter {
private final TableIdentifier tableIdentifier;
private final DataInstructionsProviderLoader dataInstructionsProviderLoader;

private final URI locationUri;

public IcebergTableAdapter(
final Catalog catalog,
final TableIdentifier tableIdentifier,
Expand All @@ -70,6 +74,7 @@ public IcebergTableAdapter(
this.table = table;
this.tableIdentifier = tableIdentifier;
this.dataInstructionsProviderLoader = dataInstructionsProviderLoader;
this.locationUri = IcebergUtils.locationUri(table);
}

/**
Expand Down Expand Up @@ -584,6 +589,13 @@ private static TableDefinition fromSchema(
* @return A new instance of {@link IcebergTableWriter} configured with the provided options.
*/
public IcebergTableWriter tableWriter(final TableWriterOptions tableWriterOptions) {
return new IcebergTableWriter(tableWriterOptions, this);
return new IcebergTableWriter(tableWriterOptions, this, dataInstructionsProviderLoader);
}

/**
* Get the location URI of the Iceberg table.
*/
public URI locationUri() {
return locationUri;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
import io.deephaven.parquet.table.CompletedParquetWrite;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.parquet.table.ParquetTools;
Expand Down Expand Up @@ -101,15 +102,30 @@ public class IcebergTableWriter {
*/
private final OutputFileFactory outputFileFactory;

/**
* The special instructions to use for writing the Iceberg data files (might be S3Instructions or other cloud
* provider-specific instructions).
*/
private final Object specialInstructions;

/**
* Characters to be used for generating random variable names of length {@link #VARIABLE_NAME_LENGTH}.
*/
private static final String CHARACTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
private static final int VARIABLE_NAME_LENGTH = 6;

/**
* Create a new Iceberg table writer instance.
*
* @param tableWriterOptions The options to configure the behavior of this writer instance.
* @param tableAdapter The Iceberg table adapter corresponding to the Iceberg table to write to.
* @param dataInstructionsProvider The provider for special instructions, to be used if special instructions not
* provided in the {@code tableWriterOptions}.
*/
IcebergTableWriter(
final TableWriterOptions tableWriterOptions,
final IcebergTableAdapter tableAdapter) {
final IcebergTableAdapter tableAdapter,
final DataInstructionsProviderLoader dataInstructionsProvider) {
this.tableWriterOptions = verifyWriterOptions(tableWriterOptions);
this.table = tableAdapter.icebergTable();

Expand All @@ -131,6 +147,11 @@ public class IcebergTableWriter {
outputFileFactory = OutputFileFactory.builderFor(table, 0, 0)
.format(FileFormat.PARQUET)
.build();

final String uriScheme = tableAdapter.locationUri().getScheme();
this.specialInstructions = tableWriterOptions.dataInstructions()
.orElseGet(() -> dataInstructionsProvider.load(uriScheme));

}

private static TableParquetWriterOptions verifyWriterOptions(
Expand Down Expand Up @@ -459,7 +480,7 @@ private List<CompletedParquetWrite> writeParquet(
final List<CompletedParquetWrite> parquetFilesWritten = new ArrayList<>(dhTables.size());
final ParquetInstructions.OnWriteCompleted onWriteCompleted = parquetFilesWritten::add;
final ParquetInstructions parquetInstructions = tableWriterOptions.toParquetInstructions(
onWriteCompleted, tableDefinition, fieldIdToColumnName);
onWriteCompleted, tableDefinition, fieldIdToColumnName, specialInstructions);

// Write the data to parquet files
for (int idx = 0; idx < dhTables.size(); idx++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.deephaven.parquet.table.ParquetInstructions;
import org.immutables.value.Value;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Map;

Expand Down Expand Up @@ -71,14 +72,18 @@ public int targetPageSize() {
* @param onWriteCompleted The callback to be invoked after writing the parquet file.
* @param tableDefinition The table definition to be populated inside the parquet file's schema
* @param fieldIdToName Mapping of field id to field name, to be populated inside the parquet file's schema
* @param specialInstructions Special instructions to be passed to the parquet writer
*/
ParquetInstructions toParquetInstructions(
@NotNull final ParquetInstructions.OnWriteCompleted onWriteCompleted,
@NotNull final TableDefinition tableDefinition,
@NotNull final Map<Integer, String> fieldIdToName) {
@NotNull final Map<Integer, String> fieldIdToName,
@Nullable final Object specialInstructions) {
final ParquetInstructions.Builder builder = new ParquetInstructions.Builder();

dataInstructions().ifPresent(builder::setSpecialInstructions);
if (specialInstructions != null) {
builder.setSpecialInstructions(specialInstructions);
}

// Add parquet writing specific instructions.
builder.setTableDefinition(tableDefinition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ public abstract class TableWriterOptions {
public abstract TableDefinition tableDefinition();

/**
* The data instructions to use for reading/writing the Iceberg data files (might be S3Instructions or other cloud
* provider-specific instructions).
* The data instructions to use for writing the Iceberg data files (might be S3Instructions or other cloud
* provider-specific instructions). If not provided, data instructions will be derived from the properties of the
* catalog.
*/
public abstract Optional<Object> dataInstructions();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
import static io.deephaven.engine.testutil.TstUtils.assertTableEquals;
import static io.deephaven.engine.util.TableTools.col;
import static io.deephaven.engine.util.TableTools.doubleCol;
import static io.deephaven.engine.util.TableTools.intCol;
import static io.deephaven.engine.util.TableTools.longCol;
import static org.apache.parquet.schema.LogicalTypeAnnotation.intType;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
Expand Down Expand Up @@ -1036,4 +1038,43 @@ void testAutoRefreshingPartitionedAppend() throws InterruptedException {
final Table expected2 = TableTools.merge(expected, part3.update("PC = `cat`"));
assertTableEquals(expected2, fromIcebergRefreshing.select());
}

@Test
void appendTableWithAndWithoutDataInstructionsTest() {
final Table source = TableTools.newTable(
intCol("intCol", 15, 0, 32, 33, 19),
doubleCol("doubleCol", 10.5, 2.5, 3.5, 40.5, 0.5),
longCol("longCol", 20L, 50L, 0L, 10L, 5L));
final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable");
final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, source.getDefinition());
{
// Following will add data instructions to the table writer
final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder()
.tableDefinition(source.getDefinition())
.build());
tableWriter.append(IcebergWriteInstructions.builder()
.addTables(source)
.build());
}

Table fromIceberg = tableAdapter.table();
Table expected = source;
assertTableEquals(expected, fromIceberg);
verifySnapshots(tableIdentifier, List.of("append"));

{
// Skip adding the data instructions to the table writer, should derive them from the catalog
final IcebergTableWriter tableWriter = tableAdapter.tableWriter(TableParquetWriterOptions.builder()
.tableDefinition(source.getDefinition())
.build());
tableWriter.append(IcebergWriteInstructions.builder()
.addTables(source)
.build());
}

fromIceberg = tableAdapter.table();
expected = TableTools.merge(source, source);
assertTableEquals(expected, fromIceberg);
verifySnapshots(tableIdentifier, List.of("append", "append"));
}
}
6 changes: 5 additions & 1 deletion py/server/deephaven/experimental/iceberg.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ def __init__(self,
the definition is inferred from the Iceberg schema. Setting a definition guarantees the returned table
will have that definition. This is useful for specifying a subset of the Iceberg schema columns.
data_instructions (Optional[s3.S3Instructions]): Special instructions for reading data files, useful when
reading files from a non-local file system, like S3.
reading files from a non-local file system, like S3. If omitted, the data instructions will be derived
from the catalog.
column_renames (Optional[Dict[str, str]]): A dictionary of old to new column names that will be renamed in
the output table.
update_mode (Optional[IcebergUpdateMode]): The update mode for the table. If omitted, the default update
Expand Down Expand Up @@ -313,6 +314,9 @@ def __init__(self,
`None`, which means use 2^20 (1,048,576)
target_page_size (Optional[int]): the target Parquet file page size in bytes, if not specified. Defaults to
`None`, which means use 2^20 bytes (1 MiB)
data_instructions (Optional[s3.S3Instructions]): Special instructions for writing data files, useful when
writing files to a non-local file system, like S3. If omitted, the data instructions will be derived
from the catalog.

Raises:
DHError: If unable to build the object.
Expand Down