Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DH-18149: Simplified passing s3 instructions for iceberg writing #6668

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.extensions.s3.S3Instructions;
import io.deephaven.parquet.table.CompletedParquetWrite;
import io.deephaven.parquet.table.ParquetInstructions;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -144,8 +146,12 @@ void toParquetInstructionTest() {
ColumnDefinition.ofInt("PC2").withPartitioning(),
ColumnDefinition.ofLong("I"));
final Map<Integer, String> fieldIdToName = Map.of(2, "field2", 3, "field3");
final S3Instructions s3Instructions = S3Instructions.builder().regionName("test-region").build();
final ParquetInstructions.OnWriteCompleted onWriteCompleted =
(final CompletedParquetWrite completedParquetWrite) -> {
/* Do nothing */ };
final ParquetInstructions parquetInstructions = writeInstructions.toParquetInstructions(
null, definition, fieldIdToName);
onWriteCompleted, definition, fieldIdToName, s3Instructions);

assertThat(parquetInstructions.getCompressionCodecName()).isEqualTo("GZIP");
assertThat(parquetInstructions.getMaximumDictionaryKeys()).isEqualTo(100);
Expand All @@ -154,7 +160,8 @@ void toParquetInstructionTest() {
assertThat(parquetInstructions.getFieldId("field1")).isEmpty();
assertThat(parquetInstructions.getFieldId("field2")).hasValue(2);
assertThat(parquetInstructions.getFieldId("field3")).hasValue(3);
assertThat(parquetInstructions.onWriteCompleted()).isEmpty();
assertThat(parquetInstructions.onWriteCompleted()).hasValue(onWriteCompleted);
assertThat(parquetInstructions.getTableDefinition()).hasValue(definition);
assertThat(parquetInstructions.getSpecialInstructions()).isEqualTo(s3Instructions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,15 @@ public static <T> Stream<T> toStream(final org.apache.iceberg.io.CloseableIterab
});
}

private static String path(String path, FileIO io) {
private static String path(@NotNull final String path, @NotNull final FileIO io) {
return io instanceof RelativeFileIO ? ((RelativeFileIO) io).absoluteLocation(path) : path;
}

public static URI locationUri(Table table) {
public static URI locationUri(@NotNull final Table table) {
return FileUtils.convertToURI(path(table.location(), table.io()), true);
}

public static URI dataFileUri(Table table, DataFile dataFile) {
public static URI dataFileUri(@NotNull final Table table, @NotNull final DataFile dataFile) {
return FileUtils.convertToURI(path(dataFile.path().toString(), table.io()), false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package io.deephaven.iceberg.internal;

import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -82,6 +83,7 @@ private DataInstructionsProviderLoader(
* @param uriScheme The URI scheme
* @return A data instructions object for the given URI scheme or null if one cannot be found
*/
@Nullable
public Object load(@NotNull final String uriScheme) {
for (final DataInstructionsProviderPlugin plugin : providers) {
final Object pluginInstructions = plugin.createInstructions(uriScheme, properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@

import static io.deephaven.iceberg.base.IcebergUtils.allManifestFiles;
import static io.deephaven.iceberg.base.IcebergUtils.dataFileUri;
import static io.deephaven.iceberg.base.IcebergUtils.locationUri;

public abstract class IcebergBaseLayout implements TableLocationKeyFinder<IcebergTableLocationKey> {
/**
Expand Down Expand Up @@ -123,6 +122,8 @@ protected IcebergTableLocationKey locationKey(
/**
* @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table.
* @param instructions The instructions for customizations while reading.
* @param dataInstructionsProvider The provider for special instructions, to be used if special instructions not
* provided in the {@code instructions}.
*/
public IcebergBaseLayout(
@NotNull final IcebergTableAdapter tableAdapter,
Expand All @@ -147,7 +148,7 @@ public IcebergBaseLayout(

this.snapshot = tableAdapter.getSnapshot(instructions);
this.tableDef = tableAdapter.definition(instructions);
this.uriScheme = locationUri(tableAdapter.icebergTable()).getScheme();
this.uriScheme = tableAdapter.getScheme();
// Add the data instructions if provided as part of the IcebergReadInstructions, or else attempt to create
// data instructions from the properties collection and URI scheme.
final Object specialInstructions = instructions.dataInstructions()
Expand All @@ -170,7 +171,6 @@ public IcebergBaseLayout(
}
this.parquetInstructions = builder.build();
}

uriSchemeTochannelsProviders = new HashMap<>();
uriSchemeTochannelsProviders.put(uriScheme,
SeekableChannelsProviderLoader.getInstance().load(uriScheme, specialInstructions));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ public final class IcebergFlatLayout extends IcebergBaseLayout {
/**
* @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table.
* @param instructions The instructions for customizations while reading.
* @param dataInstructionsProvider The provider for special instructions, to be used if special instructions not
* provided in the {@code instructions}.
*/
public IcebergFlatLayout(
@NotNull final IcebergTableAdapter tableAdapter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ private IdentityPartitioningColData(String name, Class<?> type, int index) {
* @param tableAdapter The {@link IcebergTableAdapter} that will be used to access the table.
* @param partitionSpec The Iceberg {@link PartitionSpec partition spec} for the table.
* @param instructions The instructions for customizations while reading.
* @param dataInstructionsProvider The provider for special instructions, to be used if special instructions not
* provided in the {@code instructions}.
*/
public IcebergKeyValuePartitionedLayout(
@NotNull final IcebergTableAdapter tableAdapter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public static Builder builder() {

/**
* The data instructions to use for reading the Iceberg data files (might be S3Instructions or other cloud
* provider-specific instructions).
* provider-specific instructions). If not provided, data instructions will be derived from the properties of the
* catalog.
*/
public abstract Optional<Object> dataInstructions();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.stream.Collectors;

import static io.deephaven.iceberg.base.IcebergUtils.convertToDHType;
import static io.deephaven.iceberg.base.IcebergUtils.locationUri;

/**
* This class manages an Iceberg {@link org.apache.iceberg.Table table} and provides methods to interact with it.
Expand All @@ -61,6 +62,11 @@ public class IcebergTableAdapter {
private final TableIdentifier tableIdentifier;
private final DataInstructionsProviderLoader dataInstructionsProviderLoader;

/**
* The URI scheme for the table location.
*/
private final String uriScheme;

public IcebergTableAdapter(
final Catalog catalog,
final TableIdentifier tableIdentifier,
Expand All @@ -70,6 +76,7 @@ public IcebergTableAdapter(
this.table = table;
this.tableIdentifier = tableIdentifier;
this.dataInstructionsProviderLoader = dataInstructionsProviderLoader;
this.uriScheme = locationUri(table).getScheme();
}

/**
Expand Down Expand Up @@ -584,6 +591,13 @@ private static TableDefinition fromSchema(
* @return A new instance of {@link IcebergTableWriter} configured with the provided options.
*/
public IcebergTableWriter tableWriter(final TableWriterOptions tableWriterOptions) {
return new IcebergTableWriter(tableWriterOptions, this);
return new IcebergTableWriter(tableWriterOptions, this, dataInstructionsProviderLoader);
}

/**
* Get the URI scheme for the table location.
*/
public String getScheme() {
return uriScheme;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.deephaven.engine.table.Table;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
import io.deephaven.parquet.table.CompletedParquetWrite;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.parquet.table.ParquetTools;
Expand Down Expand Up @@ -101,15 +102,35 @@ public class IcebergTableWriter {
*/
private final OutputFileFactory outputFileFactory;

/**
* The instructions to use for writing the parquet files, populated up-front and used for all writes.
*/
private final ParquetInstructions parquetInstructions;

/**
* The list of completed writes to parquet files. This list is populated by the callback provided in the parquet
* instructions. This list will be cleared after each write operation to reuse for the next write.
*/
private final List<CompletedParquetWrite> parquetFilesWritten;

/**
* Characters to be used for generating random variable names of length {@link #VARIABLE_NAME_LENGTH}.
*/
private static final String CHARACTERS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz";
private static final int VARIABLE_NAME_LENGTH = 6;

/**
* Create a new Iceberg table writer instance.
*
* @param tableWriterOptions The options to configure the behavior of this writer instance.
* @param tableAdapter The Iceberg table adapter corresponding to the Iceberg table to write to.
* @param dataInstructionsProvider The provider for special instructions, to be used if special instructions not
* provided in the {@code tableWriterOptions}.
*/
IcebergTableWriter(
final TableWriterOptions tableWriterOptions,
final IcebergTableAdapter tableAdapter) {
final IcebergTableAdapter tableAdapter,
final DataInstructionsProviderLoader dataInstructionsProvider) {
this.tableWriterOptions = verifyWriterOptions(tableWriterOptions);
this.table = tableAdapter.icebergTable();

Expand All @@ -131,6 +152,17 @@ public class IcebergTableWriter {
outputFileFactory = OutputFileFactory.builderFor(table, 0, 0)
.format(FileFormat.PARQUET)
.build();

final String uriScheme = tableAdapter.getScheme();
final Object specialInstructions = tableWriterOptions.dataInstructions()
.orElseGet(() -> dataInstructionsProvider.load(uriScheme));

// Build the parquet instructions
parquetFilesWritten = new ArrayList<>();
final ParquetInstructions.OnWriteCompleted onWriteCompleted = parquetFilesWritten::add;
parquetInstructions = this.tableWriterOptions.toParquetInstructions(
onWriteCompleted, tableDefinition, fieldIdToColumnName, specialInstructions);

}

private static TableParquetWriterOptions verifyWriterOptions(
Expand Down Expand Up @@ -284,18 +316,28 @@ public List<DataFile> writeDataFiles(@NotNull final IcebergWriteInstructions wri
verifyCompatible(writeInstructions.tables(), nonPartitioningTableDefinition);
final List<String> partitionPaths = writeInstructions.partitionPaths();
verifyPartitionPaths(tableSpec, partitionPaths);

if (!parquetFilesWritten.isEmpty()) {
throw new IllegalStateException("List of parquet files written should be empty before writing new data, " +
"found " + parquetFilesWritten.size() + " files");
}

final List<PartitionData> partitionData;
final List<CompletedParquetWrite> parquetFileInfo;
// Start a new query scope to avoid polluting the existing query scope with new parameters added for
// partitioning columns
try (final SafeCloseable _ignore =
ExecutionContext.getContext().withQueryScope(new StandaloneQueryScope()).open()) {
final Pair<List<PartitionData>, List<String[]>> ret = partitionDataFromPaths(tableSpec, partitionPaths);
partitionData = ret.getFirst();
final List<String[]> dhTableUpdateStrings = ret.getSecond();
parquetFileInfo = writeParquet(partitionData, dhTableUpdateStrings, writeInstructions);
writeParquet(partitionData, dhTableUpdateStrings, writeInstructions);
}
return dataFilesFromParquet(parquetFileInfo, partitionData);
final List<DataFile> dataFiles = dataFilesFromParquet(partitionData);

// Clear the list to reuse it for the next write
parquetFilesWritten.clear();

return dataFiles;
}

/**
Expand Down Expand Up @@ -438,8 +480,7 @@ private static String generateRandomAlphabetString(final int length) {
return stringBuilder.toString();
}

@NotNull
private List<CompletedParquetWrite> writeParquet(
private void writeParquet(
@NotNull final List<PartitionData> partitionDataList,
@NotNull final List<String[]> dhTableUpdateStrings,
@NotNull final IcebergWriteInstructions writeInstructions) {
Expand All @@ -455,12 +496,6 @@ private List<CompletedParquetWrite> writeParquet(
Require.eqZero(dhTableUpdateStrings.size(), "dhTableUpdateStrings.size()");
}

// Build the parquet instructions
final List<CompletedParquetWrite> parquetFilesWritten = new ArrayList<>(dhTables.size());
final ParquetInstructions.OnWriteCompleted onWriteCompleted = parquetFilesWritten::add;
final ParquetInstructions parquetInstructions = tableWriterOptions.toParquetInstructions(
onWriteCompleted, tableDefinition, fieldIdToColumnName);

// Write the data to parquet files
for (int idx = 0; idx < dhTables.size(); idx++) {
Table dhTable = dhTables.get(idx);
Expand All @@ -478,7 +513,6 @@ private List<CompletedParquetWrite> writeParquet(
// TODO (deephaven-core#6343): Set writeDefault() values for required columns that not present in the table
ParquetTools.writeTable(dhTable, newDataLocation, parquetInstructions);
}
return parquetFilesWritten;
}

/**
Expand Down Expand Up @@ -517,7 +551,6 @@ private void commit(
* Generate a list of {@link DataFile} objects from a list of parquet files written.
*/
private List<DataFile> dataFilesFromParquet(
@NotNull final List<CompletedParquetWrite> parquetFilesWritten,
@NotNull final List<PartitionData> partitionDataList) {
final int numFiles = parquetFilesWritten.size();
final List<DataFile> dataFiles = new ArrayList<>(numFiles);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.deephaven.parquet.table.ParquetInstructions;
import org.immutables.value.Value;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.Map;

Expand Down Expand Up @@ -71,14 +72,18 @@ public int targetPageSize() {
* @param onWriteCompleted The callback to be invoked after writing the parquet file.
* @param tableDefinition The table definition to be populated inside the parquet file's schema
* @param fieldIdToName Mapping of field id to field name, to be populated inside the parquet file's schema
* @param specialInstructions Special instructions to be passed to the parquet writer
*/
ParquetInstructions toParquetInstructions(
@NotNull final ParquetInstructions.OnWriteCompleted onWriteCompleted,
@NotNull final TableDefinition tableDefinition,
@NotNull final Map<Integer, String> fieldIdToName) {
@NotNull final Map<Integer, String> fieldIdToName,
@Nullable final Object specialInstructions) {
final ParquetInstructions.Builder builder = new ParquetInstructions.Builder();

dataInstructions().ifPresent(builder::setSpecialInstructions);
if (specialInstructions != null) {
builder.setSpecialInstructions(specialInstructions);
}

// Add parquet writing specific instructions.
builder.setTableDefinition(tableDefinition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ public abstract class TableWriterOptions {
public abstract TableDefinition tableDefinition();

/**
* The data instructions to use for reading/writing the Iceberg data files (might be S3Instructions or other cloud
* provider-specific instructions).
* The data instructions to use for writing the Iceberg data files (might be S3Instructions or other cloud
* provider-specific instructions). If not provided, data instructions will be derived from the properties of the
* catalog.
*/
public abstract Optional<Object> dataInstructions();

Expand Down
Loading