Skip to content

Commit

Permalink
Review with Devin contd.
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam committed Nov 22, 2024
1 parent d0a09fa commit 776432a
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ class TableParquetWriterOptionsTest {
* Create a new TableParquetWriterOptions builder with an empty table definition.
*/
private static TableParquetWriterOptions.Builder instructions() {
return TableParquetWriterOptions.builder().tableDefinition(TableDefinition.of());
return TableParquetWriterOptions.builder().tableDefinition(TableDefinition.of(
ColumnDefinition.ofInt("someCol")));
}

@Test
Expand All @@ -45,6 +46,18 @@ void testSetTableDefinition() {
.isEqualTo(definition);
}

@Test
void testEmptyTableDefinition() {
try {
TableParquetWriterOptions.builder()
.tableDefinition(TableDefinition.of())
.build();
failBecauseExceptionWasNotThrown(IllegalArgumentException.class);
} catch (IllegalArgumentException e) {
assertThat(e).hasMessageContaining("table definition");
}
}

@Test
void testSetCompressionCodecName() {
assertThat(instructions()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ public class IcebergTableWriter {
*/
private final org.apache.iceberg.Table table;

/**
* Store the partition spec of the Iceberg table at the time of creation of this writer instance and use it for all
* writes, so that even if the table spec, the writer will still work.
*/
private final PartitionSpec tableSpec;

/**
* The table definition used for all writes by this writer instance.
*/
Expand Down Expand Up @@ -102,9 +108,11 @@ public class IcebergTableWriter {
this.tableWriterOptions = verifyWriterOptions(tableWriterOptions);
this.table = tableAdapter.icebergTable();

this.tableSpec = table.spec();

this.tableDefinition = tableWriterOptions.tableDefinition();
verifyRequiredFields(table.schema(), tableDefinition);
verifyPartitioningColumns(table.spec(), tableDefinition);
verifyPartitioningColumns(tableSpec, tableDefinition);

this.userSchema = ((SchemaProviderImpl) tableWriterOptions.schemaProvider()).getSchema(table);
verifyFieldIdsInSchema(tableWriterOptions.fieldIdToColumnName().keySet(), userSchema);
Expand Down Expand Up @@ -246,14 +254,14 @@ public void append(@NotNull final IcebergWriteInstructions writeInstructions) {
*/
public List<DataFile> writeDataFiles(@NotNull final IcebergWriteInstructions writeInstructions) {
final List<String> partitionPaths = writeInstructions.partitionPaths();
verifyPartitionPaths(table, partitionPaths);
verifyPartitionPaths(tableSpec, partitionPaths);
final List<PartitionData> partitionData;
final List<CompletedParquetWrite> parquetFileInfo;
// Start a new query scope to avoid polluting the existing query scope with new parameters added for
// partitioning columns
try (final SafeCloseable _ignore =
ExecutionContext.getContext().withQueryScope(new StandaloneQueryScope()).open()) {
final Pair<List<PartitionData>, List<String[]>> ret = partitionDataFromPaths(table.spec(), partitionPaths);
final Pair<List<PartitionData>, List<String[]>> ret = partitionDataFromPaths(tableSpec, partitionPaths);
partitionData = ret.getFirst();
final List<String[]> dhTableUpdateStrings = ret.getSecond();
parquetFileInfo = writeParquet(partitionData, dhTableUpdateStrings, writeInstructions);
Expand All @@ -262,12 +270,12 @@ public List<DataFile> writeDataFiles(@NotNull final IcebergWriteInstructions wri
}

private static void verifyPartitionPaths(
final org.apache.iceberg.Table icebergTable,
final PartitionSpec partitionSpec,
final Collection<String> partitionPaths) {
if (icebergTable.spec().isPartitioned() && partitionPaths.isEmpty()) {
if (partitionSpec.isPartitioned() && partitionPaths.isEmpty()) {
throw new IllegalArgumentException("Cannot write data to a partitioned table without partition paths.");
}
if (!icebergTable.spec().isPartitioned() && !partitionPaths.isEmpty()) {
if (!partitionSpec.isPartitioned() && !partitionPaths.isEmpty()) {
throw new IllegalArgumentException("Cannot write data to an un-partitioned table with partition paths.");
}
}
Expand Down Expand Up @@ -395,7 +403,7 @@ private List<CompletedParquetWrite> writeParquet(
@NotNull final List<String[]> dhTableUpdateStrings,
@NotNull final IcebergWriteInstructions writeInstructions) {
final List<Table> dhTables = writeInstructions.tables();
final boolean isPartitioned = table.spec().isPartitioned();
final boolean isPartitioned = tableSpec.isPartitioned();
if (isPartitioned) {
Require.eq(dhTables.size(), "dhTables.size()",
partitionDataList.size(), "partitionDataList.size()");
Expand Down Expand Up @@ -436,7 +444,7 @@ private List<CompletedParquetWrite> writeParquet(
* Generate the location string for a new data file for the given partition data.
*/
private String getDataLocation(@NotNull final PartitionData partitionData) {
final EncryptedOutputFile outputFile = outputFileFactory.newOutputFile(table.spec(), partitionData);
final EncryptedOutputFile outputFile = outputFileFactory.newOutputFile(tableSpec, partitionData);
return outputFile.encryptingOutputFile().location();
}

Expand Down Expand Up @@ -472,7 +480,7 @@ private List<DataFile> dataFilesFromParquet(
@NotNull final List<PartitionData> partitionDataList) {
final int numFiles = parquetFilesWritten.size();
final List<DataFile> dataFiles = new ArrayList<>(numFiles);
final PartitionSpec partitionSpec = table.spec();
final PartitionSpec partitionSpec = tableSpec;
for (int idx = 0; idx < numFiles; idx++) {
final CompletedParquetWrite completedWrite = parquetFilesWritten.get(idx);
final DataFiles.Builder dataFileBuilder = DataFiles.builder(partitionSpec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,11 @@ final void checkOneToOneMapping() {
columnNames.add(columnName);
}
}

@Value.Check
final void checkNonEmptyDefinition() {
if (tableDefinition().numColumns() == 0) {
throw new IllegalArgumentException("Cannot write to an Iceberg table using empty table definition");
}
}
}

0 comments on commit 776432a

Please sign in to comment.