diff --git a/pom.xml b/pom.xml
index 3184a4be2..7359e5a67 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,12 +47,12 @@
- xtable-api
- xtable-hudi-support
+ xtable-api
+
xtable-core
- xtable-utilities
- xtable-aws
- xtable-hive-metastore
+
@@ -304,6 +304,41 @@
${delta.hive.version}
+
+
+ org.apache.parquet
+ parquet
+ 1.13.0
+ pom
+
+
+
+
+ org.apache.parquet
+ parquet-format
+ 2.10.0
+
+
+
+
+
+ org.apache.parquet
+ parquet-hadoop
+ 1.13.0
+
+
+
+ org.apache.parquet
+ parquet-column
+ 1.13.0
+
+
+
+ org.apache.parquet
+ parquet-common
+ 1.13.0
+
+
org.apache.spark
@@ -794,7 +829,7 @@
- ${google.java.format.version}
+ 1.19.2
com.google.googlejavaformat:google-java-format
diff --git a/xtable-api/pom.xml b/xtable-api/pom.xml
index 43aa7bace..ab6c87020 100644
--- a/xtable-api/pom.xml
+++ b/xtable-api/pom.xml
@@ -29,6 +29,12 @@
XTable Project API
+
+
+ org.apache.parquet
+ parquet-column
+ 1.13.0
+
com.fasterxml.jackson.core
jackson-annotations
diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
index 63e9d6733..b6145032d 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java
@@ -28,6 +28,7 @@
import com.google.common.base.Preconditions;
+import org.apache.xtable.model.config.InputPartitionFields;
import org.apache.xtable.model.sync.SyncMode;
@Value
@@ -42,13 +43,16 @@ public class ConversionConfig {
Map> targetCatalogs;
// The mode, incremental or snapshot
SyncMode syncMode;
+ // Input partition config for parquet
+ InputPartitionFields partitions;
@Builder
ConversionConfig(
@NonNull SourceTable sourceTable,
List targetTables,
Map> targetCatalogs,
- SyncMode syncMode) {
+ SyncMode syncMode,
+ InputPartitionFields partitions) {
this.sourceTable = sourceTable;
this.targetTables = targetTables;
Preconditions.checkArgument(
@@ -56,5 +60,6 @@ public class ConversionConfig {
"Please provide at-least one format to sync");
this.targetCatalogs = targetCatalogs == null ? Collections.emptyMap() : targetCatalogs;
this.syncMode = syncMode == null ? SyncMode.INCREMENTAL : syncMode;
+ this.partitions = partitions;
}
}
diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
index 939c59c09..a6c97d8fa 100644
--- a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
+++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java
@@ -34,12 +34,16 @@
class ExternalTable {
/** The name of the table. */
protected final @NonNull String name;
+
/** The format of the table (e.g. DELTA, ICEBERG, HUDI) */
protected final @NonNull String formatName;
+
/** The path to the root of the table or the metadata directory depending on the format */
protected final @NonNull String basePath;
+
/** Optional namespace for the table */
protected final String[] namespace;
+
/** The configuration for interacting with the catalog that manages this table */
protected final CatalogConfig catalogConfig;
diff --git a/xtable-api/src/main/java/org/apache/xtable/model/catalog/HierarchicalTableIdentifier.java b/xtable-api/src/main/java/org/apache/xtable/model/catalog/HierarchicalTableIdentifier.java
index e6d7cf0eb..2dfc2cd89 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/catalog/HierarchicalTableIdentifier.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/catalog/HierarchicalTableIdentifier.java
@@ -23,12 +23,18 @@
* table names. Some catalogs may omit the catalog name.
*/
public interface HierarchicalTableIdentifier extends CatalogTableIdentifier {
- /** @return the catalog name if present, otherwise null */
+ /**
+ * @return the catalog name if present, otherwise null
+ */
String getCatalogName();
- /** @return the database (or schema) name; required */
+ /**
+ * @return the database (or schema) name; required
+ */
String getDatabaseName();
- /** @return the table name; required */
+ /**
+ * @return the table name; required
+ */
String getTableName();
}
diff --git a/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java b/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java
index 2608d36a3..f387c7d36 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java
@@ -42,6 +42,7 @@ public class ThreePartHierarchicalTableIdentifier implements HierarchicalTableId
* name varies depending on the catalogType.
*/
String catalogName;
+
/**
* Catalogs have the ability to group tables logically, databaseName is the identifier for such
* logical classification. The alternate names for this field include namespace, schemaName etc.
diff --git a/xtable-api/src/main/java/org/apache/xtable/model/config/InputPartitionField.java b/xtable-api/src/main/java/org/apache/xtable/model/config/InputPartitionField.java
new file mode 100644
index 000000000..2d1ce83b2
--- /dev/null
+++ b/xtable-api/src/main/java/org/apache/xtable/model/config/InputPartitionField.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.model.config;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.Value;
+
+import org.apache.xtable.model.schema.PartitionTransformType;
+
+@Data
+@Value
+@Builder(toBuilder = true)
+public class InputPartitionField {
+ String partitionFieldName;
+ String partitionValue;
+ PartitionTransformType transformType;
+}
diff --git a/xtable-api/src/main/java/org/apache/xtable/model/config/InputPartitionFields.java b/xtable-api/src/main/java/org/apache/xtable/model/config/InputPartitionFields.java
new file mode 100644
index 000000000..10eb27369
--- /dev/null
+++ b/xtable-api/src/main/java/org/apache/xtable/model/config/InputPartitionFields.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.model.config;
+
+import java.util.List;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.Value;
+
+@Data
+@Value
+@Builder(toBuilder = true)
+public class InputPartitionFields {
+ String sourceField;
+ List partitions;
+}
diff --git a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java
index 16b6da8aa..31eb0ed41 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java
@@ -43,9 +43,11 @@ public class InternalField {
// The id field for the field. This is used to identify the field in the schema even after
// renames.
Integer fieldId;
+
// represents the fully qualified path to the field (dot separated)
@Getter(lazy = true)
String path = createPath();
+
// splits the dot separated path into parts
@Getter(lazy = true)
String[] pathParts = splitPath();
diff --git a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java
index 20af37e0c..8b7e0fc59 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java
@@ -75,7 +75,8 @@ public enum MetadataKey {
public enum MetadataValue {
MICROS,
- MILLIS
+ MILLIS,
+ NANOS
}
public static final String XTABLE_LOGICAL_TYPE = "xtableLogicalType";
diff --git a/xtable-api/src/main/java/org/apache/xtable/model/stat/ColumnStat.java b/xtable-api/src/main/java/org/apache/xtable/model/stat/ColumnStat.java
index 4e579418a..943d3b4f5 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/stat/ColumnStat.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/stat/ColumnStat.java
@@ -17,7 +17,7 @@
*/
package org.apache.xtable.model.stat;
-
+import java.util.Set;
import lombok.Builder;
import lombok.Value;
diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java
index 3aee766ef..de6e98fb9 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java
@@ -49,4 +49,8 @@ public class InternalDataFile {
@Builder.Default @NonNull List columnStats = Collections.emptyList();
// last modified time in millis since epoch
long lastModified;
+
+ public static InternalDataFileBuilder builderFrom(InternalDataFile dataFile) {
+ return dataFile.toBuilder();
+ }
}
diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
index bea0b4774..ad727210e 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
@@ -27,8 +27,9 @@ public class TableFormat {
public static final String HUDI = "HUDI";
public static final String ICEBERG = "ICEBERG";
public static final String DELTA = "DELTA";
+ public static final String PARQUET = "PARQUET";
public static String[] values() {
- return new String[] {"HUDI", "ICEBERG", "DELTA"};
+ return new String[] {"HUDI", "ICEBERG", "DELTA", "PARQUET"};
}
}
diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml
index 24bc31df5..9bf18bee9 100644
--- a/xtable-core/pom.xml
+++ b/xtable-core/pom.xml
@@ -57,6 +57,12 @@
+
+
+ org.apache.parquet
+ parquet-avro
+ 1.15.0
+
org.apache.avro
avro
diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java
new file mode 100644
index 000000000..bcb352ad6
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.io.FileNotFoundException;
+import org.apache.xtable.model.CommitsBacklog;
+import lombok.Builder;
+import lombok.NonNull;
+import org.apache.xtable.model.InstantsForIncrementalSync;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.apache.xtable.model.TableChange;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.config.InputPartitionField;
+import org.apache.xtable.model.config.InputPartitionFields;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.*;
+import org.apache.xtable.model.storage.FileFormat;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.xtable.spi.extractor.ConversionSource;
+import org.apache.parquet.format.FileMetaData;
+
+@Builder
+// @NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class ParquetConversionSource implements ConversionSource {
+ @Builder.Default
+ private static final ParquetSchemaExtractor schemaExtractor =
+ ParquetSchemaExtractor.getInstance();
+ /*private static final ParquetConversionSource INSTANCE = new ParquetConversionSource();
+ public static ParquetConversionSource getInstance() {
+ return INSTANCE;
+ }*/
+ /* private static final ParquetSchemaConverter parquetSchemaConverter =
+ ParquetSchemaConverter.getInstance();*/
+ @Builder.Default
+ private static final ParquetMetadataExtractor parquetMetadataExtractor =
+ ParquetMetadataExtractor.getInstance();
+ @Builder.Default
+ private static final ParquetPartitionValueExtractor partitionValueExtractor =
+ ParquetPartitionValueExtractor.getInstance();
+ @Builder.Default
+ private static final ParquetStatsExtractor parquetStatsExtractor =
+ ParquetStatsExtractor.getInstance();
+ private final InputPartitionFields partitions;
+ private final String tableName;
+ private final String basePath;
+ // user config path of the parquet file (partitions)
+ private final String configPath;
+ @NonNull
+ private final Configuration hadoopConf;
+
+ private InputPartitionFields initPartitionInfo() {
+ // return parquetPartitionExtractor.getPartitionsFromUserConfiguration(configPath);
+ return partitions;
+ }
+
+
+ /**
+ * To infer schema getting the latest file assumption is that latest file will have new fields
+ *
+ * @param modificationTime the commit to consider for reading the table state
+ * @return
+ */
+ @Override
+ public InternalTable getTable(Long modificationTime) {
+
+ List parquetFiles =
+ getParquetFiles(hadoopConf, basePath);
+ // TODO last file in terms of modifcation time instead
+ LocatedFileStatus latestFile = parquetFiles.get(parquetFiles.size()-1);
+
+ //.max(Comparator.comparing(FileStatus::getModificationTime));
+
+ ParquetMetadata parquetMetadata =
+ parquetMetadataExtractor.readParquetMetadata(hadoopConf, latestFile.getPath());
+ MessageType tableSchema = parquetMetadataExtractor.getSchema(parquetMetadata);
+
+ List partitionKeys =
+ initPartitionInfo().getPartitions().stream()
+ .map(InputPartitionField::getPartitionFieldName)
+ .collect(Collectors.toList());
+
+ // merge schema of partition into original as partition is not part of parquet file
+ if (!partitionKeys.isEmpty()) {
+ // TODO compilation error
+ // tableSchema = mergeParquetSchema(tableSchema, partitionKeys);
+ }
+ InternalSchema schema = schemaExtractor.toInternalSchema(tableSchema, null, null);
+
+ List partitionFields =
+ partitionKeys.isEmpty()
+ ? Collections.emptyList()
+ : partitionValueExtractor.getInternalPartitionFields(partitions);
+ DataLayoutStrategy dataLayoutStrategy =
+ partitionFields.isEmpty()
+ ? DataLayoutStrategy.FLAT
+ : DataLayoutStrategy.HIVE_STYLE_PARTITION;
+ return InternalTable.builder()
+ .tableFormat(TableFormat.PARQUET)
+ .basePath(basePath)
+ .name(tableName)
+ .layoutStrategy(dataLayoutStrategy)
+ .partitioningFields(partitionFields)
+ .readSchema(schema)
+ .latestCommitTime(Instant.ofEpochMilli(latestFile.getModificationTime()))
+ .build();
+ }
+
+ public List getInternalDataFiles() {
+ List internalDataFiles = null;
+ List parquetFiles =
+ getParquetFiles(hadoopConf, basePath);
+ List partitionValuesFromConfig = partitionValueExtractor.createPartitionValues(partitionValueExtractor.extractPartitionValues(partitions));
+ InternalTable table = getTable(-1L);
+ internalDataFiles =
+ parquetFiles.stream()
+ .map(
+ file ->
+ InternalDataFile.builder()
+ .physicalPath(file.getPath().toString())
+ .fileFormat(FileFormat.APACHE_PARQUET)
+ .fileSizeBytes(file.getLen())
+ .partitionValues(partitionValuesFromConfig)
+ .lastModified(file.getModificationTime())
+ .columnStats(
+ parquetStatsExtractor
+ .getColumnStatsForaFile(
+ parquetMetadataExtractor.readParquetMetadata(
+ hadoopConf, file.getPath())))
+ .build())
+ .collect(Collectors.toList());
+ return internalDataFiles;
+ }
+
+ // since we are considering files instead of tables in parquet
+ @Override
+ public CommitsBacklog getCommitsBacklog(InstantsForIncrementalSync lastSyncInstant){
+ long epochMilli = lastSyncInstant.getLastSyncInstant().toEpochMilli();
+ return null;
+ }
+ @Override
+ public TableChange getTableChangeForCommit(java.lang.Long commit){
+ return null;
+ }
+ @Override
+ public InternalTable getCurrentTable(){
+ return null;
+ };
+
+ /**
+ * Here to get current snapshot listing all files hence the -1 is being passed
+ *
+ * @return
+ */
+ @Override
+ public InternalSnapshot getCurrentSnapshot() {
+ List internalDataFiles = getInternalDataFiles();
+ InternalTable table = getTable(-1L);
+ return InternalSnapshot.builder()
+ .table(table)
+ .partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles))
+ .build();
+ }
+
+ /* private Schema mergeAvroSchema(Schema internalSchema, Set parititonFields) {
+
+ SchemaBuilder.FieldAssembler fieldAssembler =
+ SchemaBuilder.record(internalSchema.getName()).fields();
+ for (Schema.Field field : internalSchema.getFields()) {
+ fieldAssembler = fieldAssembler.name(field.name()).type(field.schema()).noDefault();
+ }
+
+ for (String paritionKey : parititonFields) {
+ fieldAssembler = fieldAssembler.name(paritionKey).type().stringType().noDefault();
+ }
+
+ return fieldAssembler.endRecord();
+ }*/
+
+ /* private Type mergeParquetSchema(MessageType internalSchema, List parititonFields) {
+
+ List listOfAllFields = internalSchema.getFields();
+ Type fieldsToMerge = listOfAllFields.get(0);
+ listOfAllFields.remove(0);
+ // start the merge
+ for (Type field : internalSchema.getFields()) {
+ fieldsToMerge = fieldsToMerge.union(field,false);
+ }
+ */
+ /* for (String partition : parititonFields) {
+ //create Type from partiton, TODO: check further...
+ fieldsToMerge = fieldsToMerge.union(new Type(partition, Repetition.REQUIRED),false);
+ }*/
+ /*
+
+ return fieldsToMerge;
+ }*/
+ // was returning Stream
+ public List getParquetFiles(Configuration hadoopConf, String basePath) {
+ try {
+ FileSystem fs = FileSystem.get(hadoopConf);
+ RemoteIterator iterator = fs.listFiles(new Path(basePath), true);
+ return RemoteIterators.toList(iterator).stream()
+ .filter(file -> file.getPath().getName().endsWith("parquet"))
+ .collect(Collectors.toList());
+ } catch (IOException e) { //
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public boolean isIncrementalSyncSafeFrom(Instant instant) {
+ return false;
+ }
+ @Override
+ public void close() {
+
+ }
+}
diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java
new file mode 100644
index 000000000..ba3bb3a07
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.SourceTable;
+
+/** A concrete implementation of {@link ConversionSourceProvider} for Delta Lake table format. */
+public class ParquetConversionSourceProvider extends ConversionSourceProvider {
+ @Override
+ public ParquetConversionSource getConversionSourceInstance(SourceTable sourceTable) {
+
+ return ParquetConversionSource.builder()
+ .tableName(sourceTable.getName())
+ .basePath(sourceTable.getBasePath())
+ .hadoopConf(new Configuration())
+ .build();
+ }
+}
diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java
new file mode 100644
index 000000000..a0cfc4863
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+
+public class ParquetMetadataExtractor {
+
+ private static final ParquetMetadataExtractor INSTANCE = new ParquetMetadataExtractor();
+
+ public static ParquetMetadataExtractor getInstance() {
+ return INSTANCE;
+ }
+
+ public static MessageType getSchema(ParquetMetadata footer) {
+ MessageType schema = footer.getFileMetaData().getSchema();
+ return schema;
+ }
+
+ public static ParquetMetadata readParquetMetadata(Configuration conf, Path path) {
+ ParquetMetadata footer = null;
+ // ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER);
+ return footer;
+ }
+}
diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java
new file mode 100644
index 000000000..c1b53b668
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import java.time.Instant;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Collections;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import org.apache.xtable.model.config.InputPartitionField;
+import org.apache.xtable.model.config.InputPartitionFields;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+
+/**
+ * Partition value extractor for Parquet.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class ParquetPartitionValueExtractor {
+ private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC);
+ private static final ParquetPartitionValueExtractor INSTANCE =
+ new ParquetPartitionValueExtractor();
+
+ public static ParquetPartitionValueExtractor getInstance() {
+ return INSTANCE;
+ }
+
+
+ public List convertFromParquertUserDefinedPartitionConfig(InputPartitionFields userDefinedPartitionSchema) {
+ if (userDefinedPartitionSchema.getPartitions().isEmpty()) {
+ return Collections.emptyList();
+ }
+ return getInternalPartitionFields(userDefinedPartitionSchema);
+ }
+ public List getInternalPartitionFields(InputPartitionFields partitions) {
+ List partitionFields = new ArrayList<>();
+ String sourceField = partitions.getSourceField();
+ for (InputPartitionField partition : partitions.getPartitions()) {
+ partitionFields.add(
+ InternalPartitionField.builder()
+ // TODO check the sourceField dataType (from the user config of the partitions)
+ .sourceField(
+ InternalField.builder()
+ .name(sourceField)
+ .schema(
+ InternalSchema.builder()
+ .name(sourceField)
+ .dataType(InternalType.STRING)
+ .build())
+ .build())
+ .transformType(partition.getTransformType())
+ .build());
+ }
+ return partitionFields;
+ }
+
+ public List createPartitionValues(
+ Map extractedPartitions) {
+ return extractedPartitions.entrySet()
+ .stream()
+ .map(internalPartitionField ->
+ PartitionValue.builder()
+ .partitionField(
+ internalPartitionField.getKey())
+ .range(internalPartitionField.getValue())
+ .build())
+ .collect(Collectors.toList());
+ }
+
+ public Map extractPartitionValues(
+ InputPartitionFields partitionsConf) {
+ Map partitionValues = new HashMap<>();
+ List partitions = partitionsConf.getPartitions();
+ InternalPartitionField internalPartitionField = null;
+ InputPartitionField partitionField = null;
+ Object value = null;
+ for (int i = 0; i < partitions.size(); i++) {
+ partitionField = partitions.get(i);
+ // Convert date based partitions into millis since epoch
+ switch (partitionField.getTransformType()) {
+ case YEAR:
+ value = EPOCH.plusYears(Long.parseLong(partitionField.getPartitionValue())).toInstant().toEpochMilli();
+ break;
+ case MONTH:
+ value = EPOCH.plusMonths(Long.parseLong(partitionField.getPartitionValue())).toInstant().toEpochMilli();
+ break;
+ case DAY:
+ value = EPOCH.plusDays(Long.parseLong(partitionField.getPartitionValue())).toInstant().toEpochMilli();
+ break;
+ case HOUR:
+ value = EPOCH.plusHours(Long.parseLong(partitionField.getPartitionValue())).toInstant().toEpochMilli();
+ break;
+ default:
+ value = ((Object) partitionField.getPartitionValue());
+ }
+ }
+ internalPartitionField = InternalPartitionField.builder()
+ .sourceField(
+ InternalField.builder()
+ .name(partitionsConf.getSourceField())
+ .schema(
+ InternalSchema.builder()
+ .name(partitionsConf.getSourceField())
+ // TODO check type
+ .dataType(InternalType.STRING)
+ .build())
+ .build())
+ .transformType(partitionField.getTransformType())
+ .build();
+ partitionValues.put(internalPartitionField, Range.scalar(value));
+ return partitionValues;
+ }
+}
diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
new file mode 100644
index 000000000..031b98df4
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.xtable.schema.SchemaUtils;
+
+import java.util.Collections;
+import java.util.Optional;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import java.util.Collections;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.xtable.hudi.idtracking.models.IdMapping;
+import org.apache.avro.Schema;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.apache.xtable.collectors.CustomCollectors;
+import org.apache.xtable.exception.UnsupportedSchemaTypeException;
+import org.apache.xtable.hudi.idtracking.models.IdMapping;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.parquet.schema.Type.Repetition;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+//import org.apache.parquet.avro.AvroSchemaConverter;
+
+
+/**
+ * Class that converts parquet Schema {@link Schema} to Canonical Schema {@link InternalSchema} and
+ * vice-versa. This conversion is fully reversible and there is a strict 1 to 1 mapping between
+ * parquet data types and canonical data types.
+ */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class ParquetSchemaExtractor {
+ // parquet only supports string keys in maps
+ private static final InternalField MAP_KEY_FIELD =
+ InternalField.builder()
+ .name(InternalField.Constants.MAP_KEY_FIELD_NAME)
+ .schema(
+ InternalSchema.builder()
+ .name("map_key")
+ .dataType(InternalType.STRING)
+ .isNullable(false)
+ .build())
+ .defaultValue("")
+ .build();
+ private static final ParquetSchemaExtractor INSTANCE = new ParquetSchemaExtractor();
+ private static final String ELEMENT = "element";
+ private static final String KEY = "key";
+ private static final String VALUE = "value";
+
+ public static ParquetSchemaExtractor getInstance() {
+ return INSTANCE;
+ }
+
+ private static boolean groupTypeContainsNull(GroupType schema) {
+ for (Type field : schema.getFields()) {
+ if (field.getLogicalTypeAnnotation().toOriginalType() == null) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /* private static LogicalTypeAnnotation finalizeSchema(LogicalTypeAnnotation targetSchema, InternalSchema inputSchema) {
+ if (inputSchema.isNullable()) {
+ return targetSchema.union(null); // LogicalTypeAnnotation.unknownType()
+ }
+ return targetSchema;
+ }*/
+ private Map getChildIdMap(IdMapping idMapping) {
+ if (idMapping == null) {
+ return Collections.emptyMap();
+ }
+ return idMapping.getFields().stream()
+ .collect(Collectors.toMap(IdMapping::getName, Function.identity()));
+ }
+
+ /**
+ * Converts the parquet {@link Schema} to {@link InternalSchema}.
+ *
+ * @param schema The schema being converted
+ * @param parentPath If this schema is nested within another, this will be a dot separated string
+ * representing the path from the top most field to the current schema.
+ * @param fieldNameToIdMapping map of fieldName to IdMapping to track field IDs provided by the
+ * source schema. If source schema does not contain IdMappings, map will be empty.
+ * @return a converted schema
+ */
+ public InternalSchema toInternalSchema(
+ MessageType schema, String parentPath, Map fieldNameToIdMapping) {
+ // TODO - Does not handle recursion in parquet schema
+ InternalType newDataType;
+ PrimitiveType typeName;
+ LogicalTypeAnnotation logicalType;
+ Map metadata = new HashMap<>();
+ if (schema.isPrimitive()) {
+ typeName = schema.asPrimitiveType();
+ switch (typeName.getPrimitiveTypeName()) {
+ // PrimitiveTypes
+ case INT64:
+ logicalType = schema.getLogicalTypeAnnotation();
+ if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
+ LogicalTypeAnnotation.TimeUnit timeUnit =
+ ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType).getUnit();
+ if (timeUnit == LogicalTypeAnnotation.TimeUnit.MICROS) {
+ newDataType = InternalType.TIMESTAMP;
+ metadata.put(
+ InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
+ InternalSchema.MetadataValue.MICROS);
+ } else if (timeUnit == LogicalTypeAnnotation.TimeUnit.MILLIS) {
+ newDataType = InternalType.TIMESTAMP_NTZ;
+ metadata.put(
+ InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
+ InternalSchema.MetadataValue.MILLIS);
+ } else if (timeUnit == LogicalTypeAnnotation.TimeUnit.NANOS) {
+ newDataType = InternalType.TIMESTAMP_NTZ;
+ metadata.put(
+ InternalSchema.MetadataKey.TIMESTAMP_PRECISION,
+ InternalSchema.MetadataValue.NANOS);
+ }
+ } else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) {
+ newDataType = InternalType.INT;
+ } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
+ LogicalTypeAnnotation.TimeUnit timeUnit = ((LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType).getUnit();
+ if (timeUnit == LogicalTypeAnnotation.TimeUnit.MICROS || timeUnit == LogicalTypeAnnotation.TimeUnit.NANOS) {
+ // check if INT is the InternalType needed here
+ newDataType = InternalType.INT;
+ }
+ } else {
+ newDataType = InternalType.INT;
+ }
+ break;
+ case INT32:
+ logicalType = schema.getLogicalTypeAnnotation();
+ if (logicalType instanceof LogicalTypeAnnotation.DateLogicalTypeAnnotation) {
+ newDataType = InternalType.DATE;
+ } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) {
+ LogicalTypeAnnotation.TimeUnit timeUnit = ((LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType).getUnit();
+ if (timeUnit == LogicalTypeAnnotation.TimeUnit.MILLIS) {
+ // check if INT is the InternalType needed here
+ newDataType = InternalType.INT;
+ }
+ } else {
+ newDataType = InternalType.INT;
+ }
+ break;
+ case INT96:
+ newDataType = InternalType.INT;
+ break;
+ case FLOAT:
+ logicalType = schema.getLogicalTypeAnnotation();
+ /* if (logicalType instanceof LogicalTypeAnnotation.Float16LogicalTypeAnnotation) {
+ newDataType = InternalType.FLOAT;
+ } else*/
+ if (logicalType
+ instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) {
+ metadata.put(
+ InternalSchema.MetadataKey.DECIMAL_PRECISION,
+ ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getPrecision());
+ metadata.put(
+ InternalSchema.MetadataKey.DECIMAL_SCALE,
+ ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getScale());
+ newDataType = InternalType.DECIMAL;
+ } else {
+ newDataType = InternalType.FLOAT;
+ }
+ break;
+ case FIXED_LEN_BYTE_ARRAY:
+ logicalType = schema.getLogicalTypeAnnotation();
+ if (logicalType instanceof LogicalTypeAnnotation.UUIDLogicalTypeAnnotation) {
+ newDataType = InternalType.UUID;
+ } else if (logicalType instanceof LogicalTypeAnnotation.IntervalLogicalTypeAnnotation) {
+ metadata.put(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, 12);
+ newDataType = InternalType.FIXED;
+ }
+ break;
+ // TODO add other logicalTypes?
+ case BINARY:
+ // ? Variant,GEOMETRY, GEOGRAPHY,
+ logicalType = schema.getLogicalTypeAnnotation();
+ if (logicalType instanceof LogicalTypeAnnotation.EnumLogicalTypeAnnotation) {
+ metadata.put(
+ InternalSchema.MetadataKey.ENUM_VALUES, logicalType.toOriginalType().values());
+ newDataType = InternalType.ENUM;
+ } else if (logicalType instanceof LogicalTypeAnnotation.JsonLogicalTypeAnnotation) {
+ newDataType = InternalType.BYTES;
+ } else if (logicalType instanceof LogicalTypeAnnotation.BsonLogicalTypeAnnotation) {
+ newDataType = InternalType.BYTES;
+ } else if (logicalType instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) {
+ newDataType = InternalType.STRING;
+ } else {
+ newDataType = InternalType.BYTES;
+ }
+ break;
+ case BOOLEAN:
+ newDataType = InternalType.BOOLEAN;
+ break;
+ /* case UNKNOWN:
+ newDataType = InternalType.NULL;
+ break;*/
+ default:
+ throw new UnsupportedSchemaTypeException(
+ String.format("Unsupported schema type %s", schema));
+ }
+ } else {
+ //GroupTypes
+ //typeName = schema.asGroupType();
+ switch (schema.getOriginalType()) {
+ case LIST:
+ List subFields = new ArrayList<>(schema.getFields().size());
+ for (Type parquetField : schema.getFields()) {
+ IdMapping idMapping = fieldNameToIdMapping.get(parquetField.getName());
+ InternalSchema subFieldSchema =
+ toInternalSchema(
+ new MessageType(parquetField.getName(), parquetField),
+ SchemaUtils.getFullyQualifiedPath(parentPath, parquetField.getName()),
+ getChildIdMap(idMapping));
+ //Object defaultValue = getDefaultValue(parquetField);
+ subFields.add(
+ InternalField.builder()
+ .parentPath(parentPath)
+ .name(parquetField.getName())
+ .schema(subFieldSchema)
+ //.defaultValue(defaultValue)
+ .fieldId(idMapping == null ? null : idMapping.getId())
+ .build());
+ }
+ return InternalSchema.builder()
+ .name(schema.getName())
+ //.comment(schema.getDoc())
+ .dataType(InternalType.RECORD)
+ .fields(subFields)
+ .isNullable(groupTypeContainsNull(schema))
+ .build();
+ /*case MAP:
+ IdMapping keyMapping = fieldNameToIdMapping.get(KEY);
+ IdMapping valueMapping = fieldNameToIdMapping.get(VALUE);
+ InternalSchema valueSchema =
+ toInternalSchema(
+ schema.getName(),
+ SchemaUtils.getFullyQualifiedPath(
+ parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME),
+ getChildIdMap(valueMapping));
+ InternalField valueField =
+ InternalField.builder()
+ .name(InternalField.Constants.MAP_VALUE_FIELD_NAME)
+ .parentPath(parentPath)
+ .schema(valueSchema)
+ .fieldId(valueMapping == null ? null : valueMapping.getId())
+ .build();
+ return InternalSchema.builder()
+ .name(schema.getName())
+ .dataType(InternalType.MAP)
+ .comment(schema.toString())
+ .isNullable(groupTypeContainsNull(schema))
+ .fields(
+ Arrays.asList(
+ MAP_KEY_FIELD.toBuilder()
+ .parentPath(parentPath)
+ .fieldId(keyMapping == null ? null : keyMapping.getId())
+ .build(),
+ valueField))
+ .build();*/
+ default:
+ throw new UnsupportedSchemaTypeException(
+ String.format("Unsupported schema type %s", schema));
+ }
+ }
+ newDataType = null;
+ return InternalSchema.builder()
+ .name(schema.getName())
+ .dataType(newDataType)
+ .comment(null)
+ .isNullable(groupTypeContainsNull(schema)) // to check
+ .metadata(metadata.isEmpty() ? null : metadata)
+ .build();
+ }
+
+
+ /**
+ * Internal method for converting the {@link InternalSchema} to parquet {@link Schema}.
+ *
+ * @param internalSchema internal schema representation
+ * @param currentPath If this schema is nested within another, this will be a dot separated
+ * string. This is used for the parquet namespace to guarantee unique names for nested
+ * records.
+ * @return an parquet schema
+ */
+ private LogicalTypeAnnotation fromInternalSchema(InternalSchema internalSchema, String currentPath) {
+ switch (internalSchema.getDataType()) {
+ /*case BYTES:
+ return finalizeSchema(Schema.create(Schema.Type.BYTES), internalSchema);*/
+ case BOOLEAN:
+ return LogicalTypeAnnotation.intType(8, false);
+ case INT:
+ return LogicalTypeAnnotation.intType(32, false);
+ case LONG:
+ LogicalTypeAnnotation.intType(64, false);
+ case STRING:
+ return LogicalTypeAnnotation.stringType();
+ case FLOAT:
+ int precision =
+ (int)
+ internalSchema.getMetadata().get(InternalSchema.MetadataKey.DECIMAL_PRECISION);
+ int scale =
+ (int)
+ internalSchema.getMetadata().get(InternalSchema.MetadataKey.DECIMAL_SCALE);
+ return LogicalTypeAnnotation.decimalType(scale, precision);
+ case ENUM:
+ return new org.apache.parquet.avro.AvroSchemaConverter().convert(Schema.createEnum(
+ internalSchema.getName(),
+ internalSchema.getComment(),
+ null,
+ (List)
+ internalSchema.getMetadata().get(InternalSchema.MetadataKey.ENUM_VALUES),
+ null)).getLogicalTypeAnnotation();
+ case DATE:
+ return LogicalTypeAnnotation.dateType();
+ case TIMESTAMP:
+ if (internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION) == InternalSchema.MetadataValue.MICROS) {
+ return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
+ }
+ if (internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION) == InternalSchema.MetadataValue.MILLIS) {
+ return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
+ } else if (internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION) == InternalSchema.MetadataValue.NANOS) {
+ return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.NANOS);
+ }
+ case TIMESTAMP_NTZ:
+ if (internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION)
+ == InternalSchema.MetadataValue.MICROS) {
+ return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS);
+
+ } else {
+ return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS);
+
+ }
+ case RECORD:
+ List fields =
+ internalSchema.getFields().stream()
+ // TODO check if field is decimal then its metadata should be not null (below)
+ .map(
+ field ->
+ LogicalTypeAnnotation.fromOriginalType(
+ fromInternalSchema(
+ field.getSchema(),
+ SchemaUtils.getFullyQualifiedPath(field.getName(), currentPath)).toOriginalType(), null
+ ))
+ .collect(CustomCollectors.toList(internalSchema.getFields().size()));
+ default:
+ throw new UnsupportedSchemaTypeException("Encountered unhandled type during InternalSchema to parquet conversion:" + internalSchema.getDataType());
+ }
+ }
+}
diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
new file mode 100644
index 000000000..d82805276
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import org.apache.xtable.model.schema.InternalSchema;
+
+import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.TreeSet;
+
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.model.stat.Range;
+import lombok.Builder;
+import lombok.Value;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.hadoop.fs.*;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.schema.MessageType;
+import org.apache.xtable.model.storage.FileFormat;
+import org.apache.xtable.model.config.InputPartitionFields;
+import org.apache.hadoop.conf.Configuration;
+
+@Value
+@Builder
+public class ParquetStatsExtractor {
+
+ private static final ParquetStatsExtractor INSTANCE = null; // new ParquetStatsExtractor();
+ @Builder.Default
+ private static final ParquetPartitionValueExtractor partitionExtractor =
+ ParquetPartitionValueExtractor.getInstance();
+ @Builder.Default
+ private static final ParquetSchemaExtractor schemaExtractor =
+ ParquetSchemaExtractor.getInstance();
+ @Builder.Default
+ private static final ParquetMetadataExtractor parquetMetadataExtractor =
+ ParquetMetadataExtractor.getInstance();
+ private static Map stats =
+ new LinkedHashMap();
+ private static long recordCount = 0;
+ private final InputPartitionFields partitions;
+
+ public static ParquetStatsExtractor getInstance() {
+ return INSTANCE;
+ }
+
+ public static List getColumnStatsForaFile(ParquetMetadata footer) {
+ return getStatsForaFile(footer).values().stream()
+ .flatMap(List::stream)
+ .collect(Collectors.toList());
+ }
+
+
+ public static Map> getStatsForaFile(ParquetMetadata footer) {
+ List colStat = new ArrayList();
+ Map> columnDescStats = new HashMap<>();
+ for (BlockMetaData blockMetaData : footer.getBlocks()) {
+ MessageType schema = parquetMetadataExtractor.getSchema(footer);
+ List columns = blockMetaData.getColumns();
+ columnDescStats =
+ columns
+ .stream()
+ .collect(Collectors.groupingBy(columnMetaData -> schema.getColumnDescription(columnMetaData.getPath().toArray()),
+ Collectors.mapping(columnMetaData ->ColumnStat.builder()
+ .numValues(columnMetaData.getValueCount())
+ .totalSize(columnMetaData.getTotalSize())
+ .range(Range.vector(columnMetaData.getStatistics().genericGetMin(), columnMetaData.getStatistics().genericGetMax()))
+ .build(), Collectors.toList())));
+ }
+ return columnDescStats;
+ }
+
+ private InputPartitionFields initPartitionInfo() {
+ return partitions;
+ }
+
+ private InternalDataFile toInternalDataFile(
+ Configuration hadoopConf, Path parentPath, Map stats) throws java.io.IOException {
+ FileStatus file = null;
+ List partitionValues = null;
+ try {
+ FileSystem fs = FileSystem.get(hadoopConf);
+ file = fs.getFileStatus(parentPath);
+ InputPartitionFields partitionInfo = initPartitionInfo();
+
+ ParquetMetadata footer = parquetMetadataExtractor.readParquetMetadata(hadoopConf, parentPath);
+ MessageType schema = parquetMetadataExtractor.getSchema(footer);
+
+ InternalSchema internalSchema = schemaExtractor.toInternalSchema(schema, null, null);
+ partitionValues = partitionExtractor.createPartitionValues(
+ partitionExtractor.extractPartitionValues(
+ partitionInfo));
+ } catch (java.io.IOException e) {
+
+ }
+ return InternalDataFile.builder()
+ .physicalPath(parentPath.toString())
+ .fileFormat(FileFormat.APACHE_PARQUET)
+ .partitionValues(partitionValues)
+ .fileSizeBytes(file.getLen())
+ .recordCount(recordCount)
+ .columnStats(stats.values().stream().collect(Collectors.toList()))
+ .lastModified(file.getModificationTime())
+ .build();
+ }
+
+ private static class Stats {
+ long min = Long.MAX_VALUE;
+ long max = Long.MIN_VALUE;
+ long total = 0;
+
+ public void add(long length) {
+ min = Math.min(length, min);
+ max = Math.max(length, max);
+ total += length;
+ }
+ }
+
+ private static class ColStats {
+
+ Stats valueCountStats = new Stats();
+ Stats allStats = new Stats();
+ Stats uncompressedStats = new Stats();
+ Set encodings = new TreeSet();
+ Statistics colValuesStats = null;
+ int blocks = 0;
+
+ private static void add(
+ ColumnDescriptor desc,
+ long valueCount,
+ long size,
+ long uncompressedSize,
+ Collection encodings,
+ Statistics colValuesStats) {
+ ColStats colStats = stats.get(desc);
+ if (colStats == null) {
+ colStats = new ColStats();
+ stats.put(desc, colStats);
+ }
+ colStats.add(valueCount, size, uncompressedSize, encodings, colValuesStats);
+ }
+
+ public void add(
+ long valueCount,
+ long size,
+ long uncompressedSize,
+ Collection encodings,
+ Statistics colValuesStats) {
+ ++blocks;
+ valueCountStats.add(valueCount);
+ allStats.add(size);
+ uncompressedStats.add(uncompressedSize);
+ this.encodings.addAll(encodings);
+ this.colValuesStats = colValuesStats;
+ }
+ }
+}
diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetTableExtractor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetTableExtractor.java
new file mode 100644
index 000000000..cc62b000a
--- /dev/null
+++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetTableExtractor.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+
+import lombok.Builder;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.config.InputPartitionFields;
+
+/**
+ * Extracts {@link InternalTable} canonical representation of a table at a point in time for Delta.
+ */
+@Builder
+public class ParquetTableExtractor {
+ private static final InputPartitionFields partitions = null;
+
+ private static final ParquetTableExtractor INSTANCE = new ParquetTableExtractor();
+
+ public static ParquetTableExtractor getInstance() {
+ return INSTANCE;
+ }
+
+ @Builder.Default
+ private static final ParquetTableExtractor tableExtractor = ParquetTableExtractor.getInstance();
+
+ private static final ParquetSchemaExtractor schemaExtractor =
+ ParquetSchemaExtractor.getInstance();
+
+ @Builder.Default
+ private static final ParquetPartitionValueExtractor partitionValueExtractor =
+ ParquetPartitionValueExtractor.getInstance();
+
+ /* @Builder.Default
+ private static final ParquetConversionSource parquetConversionSource =
+ ParquetConversionSource.getInstance();*/
+
+ @Builder.Default
+ private static final ParquetMetadataExtractor parquetMetadataExtractor =
+ ParquetMetadataExtractor.getInstance();
+
+ private InputPartitionFields initPartitionInfo() {
+ return partitions;
+ }
+ /* public String getBasePathFromLastModifiedTable(){
+ InternalTable table = parquetConversionSource.getTable(-1L);
+ return table.getBasePath();
+ }*/
+
+ /*public InternalTable table(String tableName, Set partitionKeys,MessageType schema) {
+ InternalSchema internalSchema = schemaExtractor.toInternalSchema(schema);
+ List partitionFields =
+ parquetConversionSource.initPartitionInfo().getPartitions();
+ List convertedPartitionFields = partitionValueExtractor.getInternalPartitionFields(partitionFields);
+ InternalTable snapshot = parquetConversionSource.getTable(-1L);
+ // Assuming InternalTable.java has its getters
+ Instant lastCommit = snapshot.latestCommitTime();
+ DataLayoutStrategy dataLayoutStrategy =
+ !partitionFields.isEmpty()
+ ? DataLayoutStrategy.HIVE_STYLE_PARTITION
+ : DataLayoutStrategy.FLAT;
+ return InternalTable.builder()
+ .tableFormat(TableFormat.PARQUET)
+ .basePath(getBasePathFromLastModifiedTable())
+ .name(tableName)
+ .layoutStrategy(dataLayoutStrategy)
+ .partitioningFields(convertedPartitionFields)
+ .readSchema(internalSchema)
+ .latestCommitTime(lastCommit)
+ .build();
+ }*/
+}
diff --git a/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java b/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java
index 3e9a133a2..1e50066a6 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java
@@ -127,6 +127,7 @@ public abstract class TestAbstractHudiTable
throw new UncheckedIOException(ex);
}
}
+
// Name of the table
protected String tableName;
// Base path for the table
diff --git a/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java b/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java
index ce3b25bda..abbe7fe67 100644
--- a/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java
@@ -66,6 +66,7 @@ public class TestJavaHudiTable extends TestAbstractHudiTable {
private HoodieJavaWriteClient writeClient;
private final Configuration conf;
+
/**
* Create a test table instance for general testing. The table is created with the schema defined
* in basic_schema.avsc which contains many data types to ensure they are handled correctly.
diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/StubCatalog.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/StubCatalog.java
index a25063c42..9475d5038 100644
--- a/xtable-core/src/test/java/org/apache/xtable/iceberg/StubCatalog.java
+++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/StubCatalog.java
@@ -36,6 +36,7 @@ public class StubCatalog implements Catalog {
public static void registerMock(String catalogName, Catalog catalog) {
REGISTERED_MOCKS.put(catalogName, catalog);
}
+
// use a mocked catalog instance to more easily test
private Catalog mockedCatalog;
diff --git a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
new file mode 100644
index 000000000..d8ed08072
--- /dev/null
+++ b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.xtable.parquet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.*;
+import org.junit.jupiter.api.Test;
+import org.apache.parquet.schema.*;
+import org.junit.jupiter.api.Assertions;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+
+// Test class added ONLY to cover main() invocation not covered by application tests.
+public class TestParquetSchemaExtractor {
+ private static final ParquetSchemaExtractor schemaExtractor =
+ ParquetSchemaExtractor.getInstance();
+
+ @Test
+ public void testPrimitiveTypes() {
+ // InternalSchema primitive1 =
+ // InternalSchema.builder().name("integer").dataType(InternalType.INT);
+ /* InternalSchema primitive2 =
+ InternalSchema.builder().name("string").dataType(InternalType.STRING);*/
+ MessageType integerPrimitiveType = null;
+
+ Assertions.assertEquals(
+ null, schemaExtractor.toInternalSchema(integerPrimitiveType, null, null));
+ }
+
+ @Test
+ public void main() {
+ testPrimitiveTypes();
+ }
+}
diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
index 349b5ca93..33bf1c80b 100644
--- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
+++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java
@@ -283,11 +283,13 @@ public static class DatasetConfig {
* necessary connection and access details for describing and listing tables
*/
ExternalCatalogConfig sourceCatalog;
+
/**
* Defines configuration one or more target catalogs, to which XTable will write or update
* tables. Unlike the source, these catalogs must be writable
*/
List targetCatalogs;
+
/** A list of datasets that specify how a source table maps to one or more target tables. */
List datasets;
@@ -300,6 +302,7 @@ public static class DatasetConfig {
public static class Dataset {
/** Identifies the source table in sourceCatalog. */
SourceTableIdentifier sourceCatalogTableIdentifier;
+
/** A list of one or more targets that this source table should be written to. */
List targetCatalogTableIdentifiers;
}
@@ -310,6 +313,7 @@ public static class Dataset {
public static class SourceTableIdentifier {
/** Specifies the table identifier in the source catalog. */
TableIdentifier tableIdentifier;
+
/**
* (Optional) Provides direct storage details such as a table’s base path (like an S3
* location) and the partition specification. This allows reading from a source even if it is
@@ -327,11 +331,13 @@ public static class TargetTableIdentifier {
* updated
*/
String catalogId;
+
/**
* The target table format (e.g., DELTA, HUDI, ICEBERG), specifying how the data will be
* stored at the target.
*/
String tableFormat;
+
/** Specifies the table identifier in the target catalog. */
TableIdentifier tableIdentifier;
}
diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
index 1a7bda874..043570f39 100644
--- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
+++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
+import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Map;
@@ -53,6 +54,7 @@
import org.apache.xtable.conversion.TargetTable;
import org.apache.xtable.hudi.HudiSourceConfig;
import org.apache.xtable.iceberg.IcebergCatalogConfig;
+import org.apache.xtable.model.config.InputPartitionFields;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.model.sync.SyncMode;
import org.apache.xtable.reflection.ReflectionUtils;
@@ -114,7 +116,7 @@ public static void main(String[] args) throws IOException {
formatter.printHelp("RunSync", OPTIONS);
return;
}
-
+ InputPartitionFields partitions = null;
DatasetConfig datasetConfig = new DatasetConfig();
try (InputStream inputStream =
Files.newInputStream(Paths.get(cmd.getOptionValue(DATASET_CONFIG_OPTION)))) {
@@ -139,6 +141,11 @@ public static void main(String[] args) throws IOException {
sourceFormat, tableFormatConverters.getTableFormatConverters().keySet()));
}
String sourceProviderClass = sourceConversionConfig.conversionSourceProviderClass;
+ // get the right config for parquet
+ if (sourceProviderClass == "Parquet") {
+ partitions =
+ getPartitionsFromUserConfiguration(Paths.get(cmd.getOptionValue(DATASET_CONFIG_OPTION)));
+ }
ConversionSourceProvider> conversionSourceProvider =
ReflectionUtils.createInstanceOfClass(sourceProviderClass);
conversionSourceProvider.init(hadoopConf);
@@ -186,6 +193,7 @@ public static void main(String[] args) throws IOException {
.sourceTable(sourceTable)
.targetTables(targetTables)
.syncMode(SyncMode.INCREMENTAL)
+ .partitions(partitions)
.build();
try {
conversionController.sync(conversionConfig, conversionSourceProvider);
@@ -242,6 +250,17 @@ static IcebergCatalogConfig loadIcebergCatalogConfig(byte[] customConfigs) throw
: YAML_MAPPER.readValue(customConfigs, IcebergCatalogConfig.class);
}
+ @VisibleForTesting
+ public static InputPartitionFields getPartitionsFromUserConfiguration(Path configPath)
+ throws IOException {
+ InputPartitionFields partitionConfiguration = null;
+ try (InputStream inputStream = Files.newInputStream(null)) {
+ ObjectReader objectReader = YAML_MAPPER.readerForUpdating(partitionConfiguration);
+ objectReader.readValue(inputStream);
+ return partitionConfiguration;
+ }
+ }
+
@Data
public static class DatasetConfig {