Skip to content

Add additional properties when doing conversion for hudi #714

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.xtable.service;

import static org.apache.xtable.conversion.ConversionUtils.convertToSourceTable;
import static org.apache.xtable.hudi.HudiSourceConfig.PARTITION_FIELD_SPEC_CONFIG;
import static org.apache.xtable.model.storage.TableFormat.DELTA;
import static org.apache.xtable.model.storage.TableFormat.HUDI;
import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
Expand All @@ -27,6 +28,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import lombok.extern.log4j.Log4j2;

Expand Down Expand Up @@ -186,20 +188,34 @@ public ConversionService(
* @return a ConvertTableResponse containing details of the converted target tables
*/
public ConvertTableResponse convertTable(ConvertTableRequest convertTableRequest) {

Properties sourceProperties = new Properties();
if (convertTableRequest.getConfigurations() != null) {
String partitionSpec =
convertTableRequest.getConfigurations().getOrDefault("partition-spec", null);
if (partitionSpec != null) {
sourceProperties.put(PARTITION_FIELD_SPEC_CONFIG, partitionSpec);
}
}

SourceTable sourceTable =
SourceTable.builder()
.name(convertTableRequest.getSourceTableName())
.basePath(convertTableRequest.getSourceTablePath())
.dataPath(convertTableRequest.getSourceDataPath())
.formatName(convertTableRequest.getSourceFormat())
.additionalProperties(sourceProperties)
.build();

List<TargetTable> targetTables = new ArrayList<>();
for (String targetFormat : convertTableRequest.getTargetFormats()) {
TargetTable targetTable =
TargetTable.builder()
.name(convertTableRequest.getSourceTableName())
.basePath(convertTableRequest.getSourceTablePath())
// set the metadata path to the data path as the default (required by Hudi)
.basePath(convertTableRequest.getSourceDataPath())
.formatName(targetFormat)
.additionalProperties(sourceProperties)
.build();
targetTables.add(targetTable);
}
Expand All @@ -220,7 +236,7 @@ public ConvertTableResponse convertTable(ConvertTableRequest convertTableRequest
String schemaString = extractSchemaString(targetTable, internalTable);
convertedTables.add(
ConvertedTable.builder()
.targetFormat(internalTable.getName())
.targetFormat(internalTable.getTableFormat())
.targetSchema(schemaString)
.targetMetadataPath(internalTable.getLatestMetdataPath())
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ public class ConvertTableRequest {
@JsonProperty("source-table-path")
private String sourceTablePath;

@JsonProperty("source-data-path")
private String sourceDataPath;

@JsonProperty("target-formats")
private List<String> targetFormats;

Expand All @@ -52,12 +55,14 @@ public ConvertTableRequest(
@JsonProperty("source-format") String sourceFormat,
@JsonProperty("source-table-name") String sourceTableName,
@JsonProperty("source-table-path") String sourceTablePath,
@JsonProperty("source-data-path") String sourceDataPath,
@JsonProperty("target-format") List<String> targetFormat,
@JsonProperty("configurations") Map<String, String> configurations) {

this.sourceFormat = sourceFormat;
this.sourceTableName = sourceTableName;
this.sourceTablePath = sourceTablePath;
this.sourceDataPath = sourceDataPath;
this.targetFormats = targetFormat;
this.configurations = configurations;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
class TestConversionService {
private static final String SOURCE_NAME = "users";
private static final String SOURCE_PATH = "s3://bucket/tables/users";
private static final String SOURCE_DATA_PATH = "s3://bucket/tables/users/data";
private static final String HUDI_META_PATH = "s3://bucket/tables/users/.hoodie";
private static final String ICEBERG_META_PATH =
"s3://bucket/tables/users/metadata/v1.metadata.json";
Expand Down Expand Up @@ -111,6 +112,7 @@ void convertToTargetHudi() {
.sourceFormat(TableFormat.DELTA)
.sourceTableName(SOURCE_NAME)
.sourceTablePath(SOURCE_PATH)
.sourceDataPath(SOURCE_DATA_PATH)
.targetFormats(Collections.singletonList(TableFormat.HUDI))
.build();

Expand All @@ -120,7 +122,7 @@ void convertToTargetHudi() {
when(provider.getConversionSourceInstance(any())).thenReturn(conversionSrc);
when(conversionSrc.getCurrentTable()).thenReturn(internalTbl);

when(internalTbl.getName()).thenReturn(TableFormat.HUDI);
when(internalTbl.getTableFormat()).thenReturn(TableFormat.HUDI);
when(internalTbl.getLatestMetdataPath()).thenReturn(HUDI_META_PATH);
when(internalTbl.getReadSchema()).thenReturn(internalSchema);

Expand All @@ -146,6 +148,7 @@ void convertToTargetIceberg() {
.sourceFormat(TableFormat.DELTA)
.sourceTableName(SOURCE_NAME)
.sourceTablePath(SOURCE_PATH)
.sourceDataPath(SOURCE_DATA_PATH)
.targetFormats(Collections.singletonList(TableFormat.ICEBERG))
.build();

Expand All @@ -157,7 +160,7 @@ void convertToTargetIceberg() {
when(provider.getConversionSourceInstance(any())).thenReturn(conversionSrc);
when(conversionSrc.getCurrentTable()).thenReturn(internalTbl);

when(internalTbl.getName()).thenReturn(TableFormat.ICEBERG);
when(internalTbl.getTableFormat()).thenReturn(TableFormat.ICEBERG);
when(internalTbl.getLatestMetdataPath()).thenReturn(ICEBERG_META_PATH);
when(internalTbl.getReadSchema()).thenReturn(internalSchema);

Expand Down Expand Up @@ -185,6 +188,7 @@ void convertToTargetDelta() {
.sourceFormat(TableFormat.ICEBERG)
.sourceTableName(SOURCE_NAME)
.sourceTablePath(SOURCE_PATH)
.sourceDataPath(SOURCE_DATA_PATH)
.targetFormats(Collections.singletonList(TableFormat.DELTA))
.build();

Expand All @@ -194,7 +198,7 @@ void convertToTargetDelta() {
when(provider.getConversionSourceInstance(any())).thenReturn(conversionSrc);
when(conversionSrc.getCurrentTable()).thenReturn(internalTbl);

when(internalTbl.getName()).thenReturn(TableFormat.DELTA);
when(internalTbl.getTableFormat()).thenReturn(TableFormat.DELTA);
when(internalTbl.getLatestMetdataPath()).thenReturn(DELTA_META_PATH);
when(internalTbl.getReadSchema()).thenReturn(internalSchema);

Expand Down