diff --git a/pom.xml b/pom.xml index 3184a4be2..7359e5a67 100644 --- a/pom.xml +++ b/pom.xml @@ -47,12 +47,12 @@ - xtable-api - xtable-hudi-support + xtable-api + xtable-core - xtable-utilities - xtable-aws - xtable-hive-metastore + @@ -304,6 +304,41 @@ ${delta.hive.version} + + + org.apache.parquet + parquet + 1.13.0 + pom + + + + + org.apache.parquet + parquet-format + 2.10.0 + + + + + + org.apache.parquet + parquet-hadoop + 1.13.0 + + + + org.apache.parquet + parquet-column + 1.13.0 + + + + org.apache.parquet + parquet-common + 1.13.0 + + org.apache.spark @@ -794,7 +829,7 @@ - ${google.java.format.version} + 1.19.2 com.google.googlejavaformat:google-java-format diff --git a/xtable-api/pom.xml b/xtable-api/pom.xml index 43aa7bace..ab6c87020 100644 --- a/xtable-api/pom.xml +++ b/xtable-api/pom.xml @@ -29,6 +29,12 @@ XTable Project API + + + org.apache.parquet + parquet-column + 1.13.0 + com.fasterxml.jackson.core jackson-annotations diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java index 63e9d6733..b6145032d 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ConversionConfig.java @@ -28,6 +28,7 @@ import com.google.common.base.Preconditions; +import org.apache.xtable.model.config.InputPartitionFields; import org.apache.xtable.model.sync.SyncMode; @Value @@ -42,13 +43,16 @@ public class ConversionConfig { Map> targetCatalogs; // The mode, incremental or snapshot SyncMode syncMode; + // Input partition config for parquet + InputPartitionFields partitions; @Builder ConversionConfig( @NonNull SourceTable sourceTable, List targetTables, Map> targetCatalogs, - SyncMode syncMode) { + SyncMode syncMode, + InputPartitionFields partitions) { this.sourceTable = sourceTable; this.targetTables = targetTables; Preconditions.checkArgument( @@ -56,5 +60,6 @@ public class ConversionConfig { "Please provide at-least one format to sync"); this.targetCatalogs = targetCatalogs == null ? Collections.emptyMap() : targetCatalogs; this.syncMode = syncMode == null ? SyncMode.INCREMENTAL : syncMode; + this.partitions = partitions; } } diff --git a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java index 939c59c09..a6c97d8fa 100644 --- a/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java +++ b/xtable-api/src/main/java/org/apache/xtable/conversion/ExternalTable.java @@ -34,12 +34,16 @@ class ExternalTable { /** The name of the table. */ protected final @NonNull String name; + /** The format of the table (e.g. DELTA, ICEBERG, HUDI) */ protected final @NonNull String formatName; + /** The path to the root of the table or the metadata directory depending on the format */ protected final @NonNull String basePath; + /** Optional namespace for the table */ protected final String[] namespace; + /** The configuration for interacting with the catalog that manages this table */ protected final CatalogConfig catalogConfig; diff --git a/xtable-api/src/main/java/org/apache/xtable/model/catalog/HierarchicalTableIdentifier.java b/xtable-api/src/main/java/org/apache/xtable/model/catalog/HierarchicalTableIdentifier.java index e6d7cf0eb..2dfc2cd89 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/catalog/HierarchicalTableIdentifier.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/catalog/HierarchicalTableIdentifier.java @@ -23,12 +23,18 @@ * table names. Some catalogs may omit the catalog name. */ public interface HierarchicalTableIdentifier extends CatalogTableIdentifier { - /** @return the catalog name if present, otherwise null */ + /** + * @return the catalog name if present, otherwise null + */ String getCatalogName(); - /** @return the database (or schema) name; required */ + /** + * @return the database (or schema) name; required + */ String getDatabaseName(); - /** @return the table name; required */ + /** + * @return the table name; required + */ String getTableName(); } diff --git a/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java b/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java index 2608d36a3..f387c7d36 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/catalog/ThreePartHierarchicalTableIdentifier.java @@ -42,6 +42,7 @@ public class ThreePartHierarchicalTableIdentifier implements HierarchicalTableId * name varies depending on the catalogType. */ String catalogName; + /** * Catalogs have the ability to group tables logically, databaseName is the identifier for such * logical classification. The alternate names for this field include namespace, schemaName etc. diff --git a/xtable-api/src/main/java/org/apache/xtable/model/config/InputPartitionField.java b/xtable-api/src/main/java/org/apache/xtable/model/config/InputPartitionField.java new file mode 100644 index 000000000..2d1ce83b2 --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/model/config/InputPartitionField.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.model.config; + +import lombok.Builder; +import lombok.Data; +import lombok.Value; + +import org.apache.xtable.model.schema.PartitionTransformType; + +@Data +@Value +@Builder(toBuilder = true) +public class InputPartitionField { + String partitionFieldName; + String partitionValue; + PartitionTransformType transformType; +} diff --git a/xtable-api/src/main/java/org/apache/xtable/model/config/InputPartitionFields.java b/xtable-api/src/main/java/org/apache/xtable/model/config/InputPartitionFields.java new file mode 100644 index 000000000..10eb27369 --- /dev/null +++ b/xtable-api/src/main/java/org/apache/xtable/model/config/InputPartitionFields.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.model.config; + +import java.util.List; + +import lombok.Builder; +import lombok.Data; +import lombok.Value; + +@Data +@Value +@Builder(toBuilder = true) +public class InputPartitionFields { + String sourceField; + List partitions; +} diff --git a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java index 16b6da8aa..31eb0ed41 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalField.java @@ -43,9 +43,11 @@ public class InternalField { // The id field for the field. This is used to identify the field in the schema even after // renames. Integer fieldId; + // represents the fully qualified path to the field (dot separated) @Getter(lazy = true) String path = createPath(); + // splits the dot separated path into parts @Getter(lazy = true) String[] pathParts = splitPath(); diff --git a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java index 20af37e0c..8b7e0fc59 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/schema/InternalSchema.java @@ -75,7 +75,8 @@ public enum MetadataKey { public enum MetadataValue { MICROS, - MILLIS + MILLIS, + NANOS } public static final String XTABLE_LOGICAL_TYPE = "xtableLogicalType"; diff --git a/xtable-api/src/main/java/org/apache/xtable/model/stat/ColumnStat.java b/xtable-api/src/main/java/org/apache/xtable/model/stat/ColumnStat.java index 4e579418a..943d3b4f5 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/stat/ColumnStat.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/stat/ColumnStat.java @@ -17,7 +17,7 @@ */ package org.apache.xtable.model.stat; - +import java.util.Set; import lombok.Builder; import lombok.Value; diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java index 3aee766ef..de6e98fb9 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/InternalDataFile.java @@ -49,4 +49,8 @@ public class InternalDataFile { @Builder.Default @NonNull List columnStats = Collections.emptyList(); // last modified time in millis since epoch long lastModified; + + public static InternalDataFileBuilder builderFrom(InternalDataFile dataFile) { + return dataFile.toBuilder(); + } } diff --git a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java b/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java index bea0b4774..ad727210e 100644 --- a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java +++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java @@ -27,8 +27,9 @@ public class TableFormat { public static final String HUDI = "HUDI"; public static final String ICEBERG = "ICEBERG"; public static final String DELTA = "DELTA"; + public static final String PARQUET = "PARQUET"; public static String[] values() { - return new String[] {"HUDI", "ICEBERG", "DELTA"}; + return new String[] {"HUDI", "ICEBERG", "DELTA", "PARQUET"}; } } diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml index 24bc31df5..9bf18bee9 100644 --- a/xtable-core/pom.xml +++ b/xtable-core/pom.xml @@ -57,6 +57,12 @@ + + + org.apache.parquet + parquet-avro + 1.15.0 + org.apache.avro avro diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java new file mode 100644 index 000000000..bcb352ad6 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSource.java @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import java.io.IOException; +import java.time.Instant; +import java.util.*; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.io.FileNotFoundException; +import org.apache.xtable.model.CommitsBacklog; +import lombok.Builder; +import lombok.NonNull; +import org.apache.xtable.model.InstantsForIncrementalSync; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.apache.xtable.model.TableChange; +import org.apache.xtable.model.*; +import org.apache.xtable.model.config.InputPartitionField; +import org.apache.xtable.model.config.InputPartitionFields; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.storage.*; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.hadoop.util.functional.RemoteIterators; +import org.apache.xtable.spi.extractor.ConversionSource; +import org.apache.parquet.format.FileMetaData; + +@Builder +// @NoArgsConstructor(access = AccessLevel.PRIVATE) +public class ParquetConversionSource implements ConversionSource { + @Builder.Default + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + /*private static final ParquetConversionSource INSTANCE = new ParquetConversionSource(); + public static ParquetConversionSource getInstance() { + return INSTANCE; + }*/ + /* private static final ParquetSchemaConverter parquetSchemaConverter = + ParquetSchemaConverter.getInstance();*/ + @Builder.Default + private static final ParquetMetadataExtractor parquetMetadataExtractor = + ParquetMetadataExtractor.getInstance(); + @Builder.Default + private static final ParquetPartitionValueExtractor partitionValueExtractor = + ParquetPartitionValueExtractor.getInstance(); + @Builder.Default + private static final ParquetStatsExtractor parquetStatsExtractor = + ParquetStatsExtractor.getInstance(); + private final InputPartitionFields partitions; + private final String tableName; + private final String basePath; + // user config path of the parquet file (partitions) + private final String configPath; + @NonNull + private final Configuration hadoopConf; + + private InputPartitionFields initPartitionInfo() { + // return parquetPartitionExtractor.getPartitionsFromUserConfiguration(configPath); + return partitions; + } + + + /** + * To infer schema getting the latest file assumption is that latest file will have new fields + * + * @param modificationTime the commit to consider for reading the table state + * @return + */ + @Override + public InternalTable getTable(Long modificationTime) { + + List parquetFiles = + getParquetFiles(hadoopConf, basePath); + // TODO last file in terms of modifcation time instead + LocatedFileStatus latestFile = parquetFiles.get(parquetFiles.size()-1); + + //.max(Comparator.comparing(FileStatus::getModificationTime)); + + ParquetMetadata parquetMetadata = + parquetMetadataExtractor.readParquetMetadata(hadoopConf, latestFile.getPath()); + MessageType tableSchema = parquetMetadataExtractor.getSchema(parquetMetadata); + + List partitionKeys = + initPartitionInfo().getPartitions().stream() + .map(InputPartitionField::getPartitionFieldName) + .collect(Collectors.toList()); + + // merge schema of partition into original as partition is not part of parquet file + if (!partitionKeys.isEmpty()) { + // TODO compilation error + // tableSchema = mergeParquetSchema(tableSchema, partitionKeys); + } + InternalSchema schema = schemaExtractor.toInternalSchema(tableSchema, null, null); + + List partitionFields = + partitionKeys.isEmpty() + ? Collections.emptyList() + : partitionValueExtractor.getInternalPartitionFields(partitions); + DataLayoutStrategy dataLayoutStrategy = + partitionFields.isEmpty() + ? DataLayoutStrategy.FLAT + : DataLayoutStrategy.HIVE_STYLE_PARTITION; + return InternalTable.builder() + .tableFormat(TableFormat.PARQUET) + .basePath(basePath) + .name(tableName) + .layoutStrategy(dataLayoutStrategy) + .partitioningFields(partitionFields) + .readSchema(schema) + .latestCommitTime(Instant.ofEpochMilli(latestFile.getModificationTime())) + .build(); + } + + public List getInternalDataFiles() { + List internalDataFiles = null; + List parquetFiles = + getParquetFiles(hadoopConf, basePath); + List partitionValuesFromConfig = partitionValueExtractor.createPartitionValues(partitionValueExtractor.extractPartitionValues(partitions)); + InternalTable table = getTable(-1L); + internalDataFiles = + parquetFiles.stream() + .map( + file -> + InternalDataFile.builder() + .physicalPath(file.getPath().toString()) + .fileFormat(FileFormat.APACHE_PARQUET) + .fileSizeBytes(file.getLen()) + .partitionValues(partitionValuesFromConfig) + .lastModified(file.getModificationTime()) + .columnStats( + parquetStatsExtractor + .getColumnStatsForaFile( + parquetMetadataExtractor.readParquetMetadata( + hadoopConf, file.getPath()))) + .build()) + .collect(Collectors.toList()); + return internalDataFiles; + } + + // since we are considering files instead of tables in parquet + @Override + public CommitsBacklog getCommitsBacklog(InstantsForIncrementalSync lastSyncInstant){ + long epochMilli = lastSyncInstant.getLastSyncInstant().toEpochMilli(); + return null; + } + @Override + public TableChange getTableChangeForCommit(java.lang.Long commit){ + return null; + } + @Override + public InternalTable getCurrentTable(){ + return null; + }; + + /** + * Here to get current snapshot listing all files hence the -1 is being passed + * + * @return + */ + @Override + public InternalSnapshot getCurrentSnapshot() { + List internalDataFiles = getInternalDataFiles(); + InternalTable table = getTable(-1L); + return InternalSnapshot.builder() + .table(table) + .partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles)) + .build(); + } + + /* private Schema mergeAvroSchema(Schema internalSchema, Set parititonFields) { + + SchemaBuilder.FieldAssembler fieldAssembler = + SchemaBuilder.record(internalSchema.getName()).fields(); + for (Schema.Field field : internalSchema.getFields()) { + fieldAssembler = fieldAssembler.name(field.name()).type(field.schema()).noDefault(); + } + + for (String paritionKey : parititonFields) { + fieldAssembler = fieldAssembler.name(paritionKey).type().stringType().noDefault(); + } + + return fieldAssembler.endRecord(); + }*/ + + /* private Type mergeParquetSchema(MessageType internalSchema, List parititonFields) { + + List listOfAllFields = internalSchema.getFields(); + Type fieldsToMerge = listOfAllFields.get(0); + listOfAllFields.remove(0); + // start the merge + for (Type field : internalSchema.getFields()) { + fieldsToMerge = fieldsToMerge.union(field,false); + } + */ + /* for (String partition : parititonFields) { + //create Type from partiton, TODO: check further... + fieldsToMerge = fieldsToMerge.union(new Type(partition, Repetition.REQUIRED),false); + }*/ + /* + + return fieldsToMerge; + }*/ + // was returning Stream + public List getParquetFiles(Configuration hadoopConf, String basePath) { + try { + FileSystem fs = FileSystem.get(hadoopConf); + RemoteIterator iterator = fs.listFiles(new Path(basePath), true); + return RemoteIterators.toList(iterator).stream() + .filter(file -> file.getPath().getName().endsWith("parquet")) + .collect(Collectors.toList()); + } catch (IOException e) { // + throw new RuntimeException(e); + } + } + + @Override + public boolean isIncrementalSyncSafeFrom(Instant instant) { + return false; + } + @Override + public void close() { + + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java new file mode 100644 index 000000000..ba3bb3a07 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetConversionSourceProvider.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import org.apache.hadoop.conf.Configuration; + +import org.apache.xtable.conversion.ConversionSourceProvider; +import org.apache.xtable.conversion.SourceTable; + +/** A concrete implementation of {@link ConversionSourceProvider} for Delta Lake table format. */ +public class ParquetConversionSourceProvider extends ConversionSourceProvider { + @Override + public ParquetConversionSource getConversionSourceInstance(SourceTable sourceTable) { + + return ParquetConversionSource.builder() + .tableName(sourceTable.getName()) + .basePath(sourceTable.getBasePath()) + .hadoopConf(new Configuration()) + .build(); + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java new file mode 100644 index 000000000..a0cfc4863 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetMetadataExtractor.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; + +public class ParquetMetadataExtractor { + + private static final ParquetMetadataExtractor INSTANCE = new ParquetMetadataExtractor(); + + public static ParquetMetadataExtractor getInstance() { + return INSTANCE; + } + + public static MessageType getSchema(ParquetMetadata footer) { + MessageType schema = footer.getFileMetaData().getSchema(); + return schema; + } + + public static ParquetMetadata readParquetMetadata(Configuration conf, Path path) { + ParquetMetadata footer = null; + // ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER); + return footer; + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java new file mode 100644 index 000000000..c1b53b668 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetPartitionValueExtractor.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Collections; +import java.util.Map; +import java.util.stream.Collectors; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import org.apache.xtable.model.config.InputPartitionField; +import org.apache.xtable.model.config.InputPartitionFields; +import org.apache.xtable.model.schema.InternalPartitionField; +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.Range; +import org.apache.xtable.model.schema.InternalType; +import org.apache.xtable.model.schema.InternalField; +import org.apache.xtable.model.schema.InternalSchema; + +/** + * Partition value extractor for Parquet. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class ParquetPartitionValueExtractor { + private static final OffsetDateTime EPOCH = Instant.ofEpochSecond(0).atOffset(ZoneOffset.UTC); + private static final ParquetPartitionValueExtractor INSTANCE = + new ParquetPartitionValueExtractor(); + + public static ParquetPartitionValueExtractor getInstance() { + return INSTANCE; + } + + + public List convertFromParquertUserDefinedPartitionConfig(InputPartitionFields userDefinedPartitionSchema) { + if (userDefinedPartitionSchema.getPartitions().isEmpty()) { + return Collections.emptyList(); + } + return getInternalPartitionFields(userDefinedPartitionSchema); + } + public List getInternalPartitionFields(InputPartitionFields partitions) { + List partitionFields = new ArrayList<>(); + String sourceField = partitions.getSourceField(); + for (InputPartitionField partition : partitions.getPartitions()) { + partitionFields.add( + InternalPartitionField.builder() + // TODO check the sourceField dataType (from the user config of the partitions) + .sourceField( + InternalField.builder() + .name(sourceField) + .schema( + InternalSchema.builder() + .name(sourceField) + .dataType(InternalType.STRING) + .build()) + .build()) + .transformType(partition.getTransformType()) + .build()); + } + return partitionFields; + } + + public List createPartitionValues( + Map extractedPartitions) { + return extractedPartitions.entrySet() + .stream() + .map(internalPartitionField -> + PartitionValue.builder() + .partitionField( + internalPartitionField.getKey()) + .range(internalPartitionField.getValue()) + .build()) + .collect(Collectors.toList()); + } + + public Map extractPartitionValues( + InputPartitionFields partitionsConf) { + Map partitionValues = new HashMap<>(); + List partitions = partitionsConf.getPartitions(); + InternalPartitionField internalPartitionField = null; + InputPartitionField partitionField = null; + Object value = null; + for (int i = 0; i < partitions.size(); i++) { + partitionField = partitions.get(i); + // Convert date based partitions into millis since epoch + switch (partitionField.getTransformType()) { + case YEAR: + value = EPOCH.plusYears(Long.parseLong(partitionField.getPartitionValue())).toInstant().toEpochMilli(); + break; + case MONTH: + value = EPOCH.plusMonths(Long.parseLong(partitionField.getPartitionValue())).toInstant().toEpochMilli(); + break; + case DAY: + value = EPOCH.plusDays(Long.parseLong(partitionField.getPartitionValue())).toInstant().toEpochMilli(); + break; + case HOUR: + value = EPOCH.plusHours(Long.parseLong(partitionField.getPartitionValue())).toInstant().toEpochMilli(); + break; + default: + value = ((Object) partitionField.getPartitionValue()); + } + } + internalPartitionField = InternalPartitionField.builder() + .sourceField( + InternalField.builder() + .name(partitionsConf.getSourceField()) + .schema( + InternalSchema.builder() + .name(partitionsConf.getSourceField()) + // TODO check type + .dataType(InternalType.STRING) + .build()) + .build()) + .transformType(partitionField.getTransformType()) + .build(); + partitionValues.put(internalPartitionField, Range.scalar(value)); + return partitionValues; + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java new file mode 100644 index 000000000..031b98df4 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetSchemaExtractor.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import java.util.HashMap; +import java.util.Map; +import java.util.List; +import java.util.ArrayList; + +import org.apache.xtable.schema.SchemaUtils; + +import java.util.Collections; +import java.util.Optional; + +import lombok.AccessLevel; +import lombok.NoArgsConstructor; + +import java.util.Collections; +import java.util.function.Function; +import java.util.stream.Collectors; + +import org.apache.xtable.hudi.idtracking.models.IdMapping; +import org.apache.avro.Schema; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.xtable.collectors.CustomCollectors; +import org.apache.xtable.exception.UnsupportedSchemaTypeException; +import org.apache.xtable.hudi.idtracking.models.IdMapping; +import org.apache.xtable.model.schema.InternalField; +import org.apache.parquet.schema.Type.Repetition; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; +//import org.apache.parquet.avro.AvroSchemaConverter; + + +/** + * Class that converts parquet Schema {@link Schema} to Canonical Schema {@link InternalSchema} and + * vice-versa. This conversion is fully reversible and there is a strict 1 to 1 mapping between + * parquet data types and canonical data types. + */ +@NoArgsConstructor(access = AccessLevel.PRIVATE) +public class ParquetSchemaExtractor { + // parquet only supports string keys in maps + private static final InternalField MAP_KEY_FIELD = + InternalField.builder() + .name(InternalField.Constants.MAP_KEY_FIELD_NAME) + .schema( + InternalSchema.builder() + .name("map_key") + .dataType(InternalType.STRING) + .isNullable(false) + .build()) + .defaultValue("") + .build(); + private static final ParquetSchemaExtractor INSTANCE = new ParquetSchemaExtractor(); + private static final String ELEMENT = "element"; + private static final String KEY = "key"; + private static final String VALUE = "value"; + + public static ParquetSchemaExtractor getInstance() { + return INSTANCE; + } + + private static boolean groupTypeContainsNull(GroupType schema) { + for (Type field : schema.getFields()) { + if (field.getLogicalTypeAnnotation().toOriginalType() == null) { + return true; + } + } + return false; + } + + /* private static LogicalTypeAnnotation finalizeSchema(LogicalTypeAnnotation targetSchema, InternalSchema inputSchema) { + if (inputSchema.isNullable()) { + return targetSchema.union(null); // LogicalTypeAnnotation.unknownType() + } + return targetSchema; + }*/ + private Map getChildIdMap(IdMapping idMapping) { + if (idMapping == null) { + return Collections.emptyMap(); + } + return idMapping.getFields().stream() + .collect(Collectors.toMap(IdMapping::getName, Function.identity())); + } + + /** + * Converts the parquet {@link Schema} to {@link InternalSchema}. + * + * @param schema The schema being converted + * @param parentPath If this schema is nested within another, this will be a dot separated string + * representing the path from the top most field to the current schema. + * @param fieldNameToIdMapping map of fieldName to IdMapping to track field IDs provided by the + * source schema. If source schema does not contain IdMappings, map will be empty. + * @return a converted schema + */ + public InternalSchema toInternalSchema( + MessageType schema, String parentPath, Map fieldNameToIdMapping) { + // TODO - Does not handle recursion in parquet schema + InternalType newDataType; + PrimitiveType typeName; + LogicalTypeAnnotation logicalType; + Map metadata = new HashMap<>(); + if (schema.isPrimitive()) { + typeName = schema.asPrimitiveType(); + switch (typeName.getPrimitiveTypeName()) { + // PrimitiveTypes + case INT64: + logicalType = schema.getLogicalTypeAnnotation(); + if (logicalType instanceof LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimeUnit timeUnit = + ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) logicalType).getUnit(); + if (timeUnit == LogicalTypeAnnotation.TimeUnit.MICROS) { + newDataType = InternalType.TIMESTAMP; + metadata.put( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, + InternalSchema.MetadataValue.MICROS); + } else if (timeUnit == LogicalTypeAnnotation.TimeUnit.MILLIS) { + newDataType = InternalType.TIMESTAMP_NTZ; + metadata.put( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, + InternalSchema.MetadataValue.MILLIS); + } else if (timeUnit == LogicalTypeAnnotation.TimeUnit.NANOS) { + newDataType = InternalType.TIMESTAMP_NTZ; + metadata.put( + InternalSchema.MetadataKey.TIMESTAMP_PRECISION, + InternalSchema.MetadataValue.NANOS); + } + } else if (logicalType instanceof LogicalTypeAnnotation.IntLogicalTypeAnnotation) { + newDataType = InternalType.INT; + } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimeUnit timeUnit = ((LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType).getUnit(); + if (timeUnit == LogicalTypeAnnotation.TimeUnit.MICROS || timeUnit == LogicalTypeAnnotation.TimeUnit.NANOS) { + // check if INT is the InternalType needed here + newDataType = InternalType.INT; + } + } else { + newDataType = InternalType.INT; + } + break; + case INT32: + logicalType = schema.getLogicalTypeAnnotation(); + if (logicalType instanceof LogicalTypeAnnotation.DateLogicalTypeAnnotation) { + newDataType = InternalType.DATE; + } else if (logicalType instanceof LogicalTypeAnnotation.TimeLogicalTypeAnnotation) { + LogicalTypeAnnotation.TimeUnit timeUnit = ((LogicalTypeAnnotation.TimeLogicalTypeAnnotation) logicalType).getUnit(); + if (timeUnit == LogicalTypeAnnotation.TimeUnit.MILLIS) { + // check if INT is the InternalType needed here + newDataType = InternalType.INT; + } + } else { + newDataType = InternalType.INT; + } + break; + case INT96: + newDataType = InternalType.INT; + break; + case FLOAT: + logicalType = schema.getLogicalTypeAnnotation(); + /* if (logicalType instanceof LogicalTypeAnnotation.Float16LogicalTypeAnnotation) { + newDataType = InternalType.FLOAT; + } else*/ + if (logicalType + instanceof LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) { + metadata.put( + InternalSchema.MetadataKey.DECIMAL_PRECISION, + ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getPrecision()); + metadata.put( + InternalSchema.MetadataKey.DECIMAL_SCALE, + ((LogicalTypeAnnotation.DecimalLogicalTypeAnnotation) logicalType).getScale()); + newDataType = InternalType.DECIMAL; + } else { + newDataType = InternalType.FLOAT; + } + break; + case FIXED_LEN_BYTE_ARRAY: + logicalType = schema.getLogicalTypeAnnotation(); + if (logicalType instanceof LogicalTypeAnnotation.UUIDLogicalTypeAnnotation) { + newDataType = InternalType.UUID; + } else if (logicalType instanceof LogicalTypeAnnotation.IntervalLogicalTypeAnnotation) { + metadata.put(InternalSchema.MetadataKey.FIXED_BYTES_SIZE, 12); + newDataType = InternalType.FIXED; + } + break; + // TODO add other logicalTypes? + case BINARY: + // ? Variant,GEOMETRY, GEOGRAPHY, + logicalType = schema.getLogicalTypeAnnotation(); + if (logicalType instanceof LogicalTypeAnnotation.EnumLogicalTypeAnnotation) { + metadata.put( + InternalSchema.MetadataKey.ENUM_VALUES, logicalType.toOriginalType().values()); + newDataType = InternalType.ENUM; + } else if (logicalType instanceof LogicalTypeAnnotation.JsonLogicalTypeAnnotation) { + newDataType = InternalType.BYTES; + } else if (logicalType instanceof LogicalTypeAnnotation.BsonLogicalTypeAnnotation) { + newDataType = InternalType.BYTES; + } else if (logicalType instanceof LogicalTypeAnnotation.StringLogicalTypeAnnotation) { + newDataType = InternalType.STRING; + } else { + newDataType = InternalType.BYTES; + } + break; + case BOOLEAN: + newDataType = InternalType.BOOLEAN; + break; + /* case UNKNOWN: + newDataType = InternalType.NULL; + break;*/ + default: + throw new UnsupportedSchemaTypeException( + String.format("Unsupported schema type %s", schema)); + } + } else { + //GroupTypes + //typeName = schema.asGroupType(); + switch (schema.getOriginalType()) { + case LIST: + List subFields = new ArrayList<>(schema.getFields().size()); + for (Type parquetField : schema.getFields()) { + IdMapping idMapping = fieldNameToIdMapping.get(parquetField.getName()); + InternalSchema subFieldSchema = + toInternalSchema( + new MessageType(parquetField.getName(), parquetField), + SchemaUtils.getFullyQualifiedPath(parentPath, parquetField.getName()), + getChildIdMap(idMapping)); + //Object defaultValue = getDefaultValue(parquetField); + subFields.add( + InternalField.builder() + .parentPath(parentPath) + .name(parquetField.getName()) + .schema(subFieldSchema) + //.defaultValue(defaultValue) + .fieldId(idMapping == null ? null : idMapping.getId()) + .build()); + } + return InternalSchema.builder() + .name(schema.getName()) + //.comment(schema.getDoc()) + .dataType(InternalType.RECORD) + .fields(subFields) + .isNullable(groupTypeContainsNull(schema)) + .build(); + /*case MAP: + IdMapping keyMapping = fieldNameToIdMapping.get(KEY); + IdMapping valueMapping = fieldNameToIdMapping.get(VALUE); + InternalSchema valueSchema = + toInternalSchema( + schema.getName(), + SchemaUtils.getFullyQualifiedPath( + parentPath, InternalField.Constants.MAP_VALUE_FIELD_NAME), + getChildIdMap(valueMapping)); + InternalField valueField = + InternalField.builder() + .name(InternalField.Constants.MAP_VALUE_FIELD_NAME) + .parentPath(parentPath) + .schema(valueSchema) + .fieldId(valueMapping == null ? null : valueMapping.getId()) + .build(); + return InternalSchema.builder() + .name(schema.getName()) + .dataType(InternalType.MAP) + .comment(schema.toString()) + .isNullable(groupTypeContainsNull(schema)) + .fields( + Arrays.asList( + MAP_KEY_FIELD.toBuilder() + .parentPath(parentPath) + .fieldId(keyMapping == null ? null : keyMapping.getId()) + .build(), + valueField)) + .build();*/ + default: + throw new UnsupportedSchemaTypeException( + String.format("Unsupported schema type %s", schema)); + } + } + newDataType = null; + return InternalSchema.builder() + .name(schema.getName()) + .dataType(newDataType) + .comment(null) + .isNullable(groupTypeContainsNull(schema)) // to check + .metadata(metadata.isEmpty() ? null : metadata) + .build(); + } + + + /** + * Internal method for converting the {@link InternalSchema} to parquet {@link Schema}. + * + * @param internalSchema internal schema representation + * @param currentPath If this schema is nested within another, this will be a dot separated + * string. This is used for the parquet namespace to guarantee unique names for nested + * records. + * @return an parquet schema + */ + private LogicalTypeAnnotation fromInternalSchema(InternalSchema internalSchema, String currentPath) { + switch (internalSchema.getDataType()) { + /*case BYTES: + return finalizeSchema(Schema.create(Schema.Type.BYTES), internalSchema);*/ + case BOOLEAN: + return LogicalTypeAnnotation.intType(8, false); + case INT: + return LogicalTypeAnnotation.intType(32, false); + case LONG: + LogicalTypeAnnotation.intType(64, false); + case STRING: + return LogicalTypeAnnotation.stringType(); + case FLOAT: + int precision = + (int) + internalSchema.getMetadata().get(InternalSchema.MetadataKey.DECIMAL_PRECISION); + int scale = + (int) + internalSchema.getMetadata().get(InternalSchema.MetadataKey.DECIMAL_SCALE); + return LogicalTypeAnnotation.decimalType(scale, precision); + case ENUM: + return new org.apache.parquet.avro.AvroSchemaConverter().convert(Schema.createEnum( + internalSchema.getName(), + internalSchema.getComment(), + null, + (List) + internalSchema.getMetadata().get(InternalSchema.MetadataKey.ENUM_VALUES), + null)).getLogicalTypeAnnotation(); + case DATE: + return LogicalTypeAnnotation.dateType(); + case TIMESTAMP: + if (internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION) == InternalSchema.MetadataValue.MICROS) { + return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS); + } + if (internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION) == InternalSchema.MetadataValue.MILLIS) { + return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS); + } else if (internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION) == InternalSchema.MetadataValue.NANOS) { + return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.NANOS); + } + case TIMESTAMP_NTZ: + if (internalSchema.getMetadata().get(InternalSchema.MetadataKey.TIMESTAMP_PRECISION) + == InternalSchema.MetadataValue.MICROS) { + return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MICROS); + + } else { + return LogicalTypeAnnotation.timestampType(true, LogicalTypeAnnotation.TimeUnit.MILLIS); + + } + case RECORD: + List fields = + internalSchema.getFields().stream() + // TODO check if field is decimal then its metadata should be not null (below) + .map( + field -> + LogicalTypeAnnotation.fromOriginalType( + fromInternalSchema( + field.getSchema(), + SchemaUtils.getFullyQualifiedPath(field.getName(), currentPath)).toOriginalType(), null + )) + .collect(CustomCollectors.toList(internalSchema.getFields().size())); + default: + throw new UnsupportedSchemaTypeException("Encountered unhandled type during InternalSchema to parquet conversion:" + internalSchema.getDataType()); + } + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java new file mode 100644 index 000000000..d82805276 --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetStatsExtractor.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import org.apache.xtable.model.schema.InternalSchema; + +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.ArrayList; +import java.util.Map; +import java.util.HashMap; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.TreeSet; + +import org.apache.xtable.model.stat.PartitionValue; +import org.apache.xtable.model.stat.ColumnStat; +import org.apache.xtable.model.stat.Range; +import lombok.Builder; +import lombok.Value; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.hadoop.fs.*; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.statistics.Statistics; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.apache.xtable.model.storage.FileFormat; +import org.apache.xtable.model.config.InputPartitionFields; +import org.apache.hadoop.conf.Configuration; + +@Value +@Builder +public class ParquetStatsExtractor { + + private static final ParquetStatsExtractor INSTANCE = null; // new ParquetStatsExtractor(); + @Builder.Default + private static final ParquetPartitionValueExtractor partitionExtractor = + ParquetPartitionValueExtractor.getInstance(); + @Builder.Default + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + @Builder.Default + private static final ParquetMetadataExtractor parquetMetadataExtractor = + ParquetMetadataExtractor.getInstance(); + private static Map stats = + new LinkedHashMap(); + private static long recordCount = 0; + private final InputPartitionFields partitions; + + public static ParquetStatsExtractor getInstance() { + return INSTANCE; + } + + public static List getColumnStatsForaFile(ParquetMetadata footer) { + return getStatsForaFile(footer).values().stream() + .flatMap(List::stream) + .collect(Collectors.toList()); + } + + + public static Map> getStatsForaFile(ParquetMetadata footer) { + List colStat = new ArrayList(); + Map> columnDescStats = new HashMap<>(); + for (BlockMetaData blockMetaData : footer.getBlocks()) { + MessageType schema = parquetMetadataExtractor.getSchema(footer); + List columns = blockMetaData.getColumns(); + columnDescStats = + columns + .stream() + .collect(Collectors.groupingBy(columnMetaData -> schema.getColumnDescription(columnMetaData.getPath().toArray()), + Collectors.mapping(columnMetaData ->ColumnStat.builder() + .numValues(columnMetaData.getValueCount()) + .totalSize(columnMetaData.getTotalSize()) + .range(Range.vector(columnMetaData.getStatistics().genericGetMin(), columnMetaData.getStatistics().genericGetMax())) + .build(), Collectors.toList()))); + } + return columnDescStats; + } + + private InputPartitionFields initPartitionInfo() { + return partitions; + } + + private InternalDataFile toInternalDataFile( + Configuration hadoopConf, Path parentPath, Map stats) throws java.io.IOException { + FileStatus file = null; + List partitionValues = null; + try { + FileSystem fs = FileSystem.get(hadoopConf); + file = fs.getFileStatus(parentPath); + InputPartitionFields partitionInfo = initPartitionInfo(); + + ParquetMetadata footer = parquetMetadataExtractor.readParquetMetadata(hadoopConf, parentPath); + MessageType schema = parquetMetadataExtractor.getSchema(footer); + + InternalSchema internalSchema = schemaExtractor.toInternalSchema(schema, null, null); + partitionValues = partitionExtractor.createPartitionValues( + partitionExtractor.extractPartitionValues( + partitionInfo)); + } catch (java.io.IOException e) { + + } + return InternalDataFile.builder() + .physicalPath(parentPath.toString()) + .fileFormat(FileFormat.APACHE_PARQUET) + .partitionValues(partitionValues) + .fileSizeBytes(file.getLen()) + .recordCount(recordCount) + .columnStats(stats.values().stream().collect(Collectors.toList())) + .lastModified(file.getModificationTime()) + .build(); + } + + private static class Stats { + long min = Long.MAX_VALUE; + long max = Long.MIN_VALUE; + long total = 0; + + public void add(long length) { + min = Math.min(length, min); + max = Math.max(length, max); + total += length; + } + } + + private static class ColStats { + + Stats valueCountStats = new Stats(); + Stats allStats = new Stats(); + Stats uncompressedStats = new Stats(); + Set encodings = new TreeSet(); + Statistics colValuesStats = null; + int blocks = 0; + + private static void add( + ColumnDescriptor desc, + long valueCount, + long size, + long uncompressedSize, + Collection encodings, + Statistics colValuesStats) { + ColStats colStats = stats.get(desc); + if (colStats == null) { + colStats = new ColStats(); + stats.put(desc, colStats); + } + colStats.add(valueCount, size, uncompressedSize, encodings, colValuesStats); + } + + public void add( + long valueCount, + long size, + long uncompressedSize, + Collection encodings, + Statistics colValuesStats) { + ++blocks; + valueCountStats.add(valueCount); + allStats.add(size); + uncompressedStats.add(uncompressedSize); + this.encodings.addAll(encodings); + this.colValuesStats = colValuesStats; + } + } +} diff --git a/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetTableExtractor.java b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetTableExtractor.java new file mode 100644 index 000000000..cc62b000a --- /dev/null +++ b/xtable-core/src/main/java/org/apache/xtable/parquet/ParquetTableExtractor.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; + +import lombok.Builder; +import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.config.InputPartitionFields; + +/** + * Extracts {@link InternalTable} canonical representation of a table at a point in time for Delta. + */ +@Builder +public class ParquetTableExtractor { + private static final InputPartitionFields partitions = null; + + private static final ParquetTableExtractor INSTANCE = new ParquetTableExtractor(); + + public static ParquetTableExtractor getInstance() { + return INSTANCE; + } + + @Builder.Default + private static final ParquetTableExtractor tableExtractor = ParquetTableExtractor.getInstance(); + + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + + @Builder.Default + private static final ParquetPartitionValueExtractor partitionValueExtractor = + ParquetPartitionValueExtractor.getInstance(); + + /* @Builder.Default + private static final ParquetConversionSource parquetConversionSource = + ParquetConversionSource.getInstance();*/ + + @Builder.Default + private static final ParquetMetadataExtractor parquetMetadataExtractor = + ParquetMetadataExtractor.getInstance(); + + private InputPartitionFields initPartitionInfo() { + return partitions; + } + /* public String getBasePathFromLastModifiedTable(){ + InternalTable table = parquetConversionSource.getTable(-1L); + return table.getBasePath(); + }*/ + + /*public InternalTable table(String tableName, Set partitionKeys,MessageType schema) { + InternalSchema internalSchema = schemaExtractor.toInternalSchema(schema); + List partitionFields = + parquetConversionSource.initPartitionInfo().getPartitions(); + List convertedPartitionFields = partitionValueExtractor.getInternalPartitionFields(partitionFields); + InternalTable snapshot = parquetConversionSource.getTable(-1L); + // Assuming InternalTable.java has its getters + Instant lastCommit = snapshot.latestCommitTime(); + DataLayoutStrategy dataLayoutStrategy = + !partitionFields.isEmpty() + ? DataLayoutStrategy.HIVE_STYLE_PARTITION + : DataLayoutStrategy.FLAT; + return InternalTable.builder() + .tableFormat(TableFormat.PARQUET) + .basePath(getBasePathFromLastModifiedTable()) + .name(tableName) + .layoutStrategy(dataLayoutStrategy) + .partitioningFields(convertedPartitionFields) + .readSchema(internalSchema) + .latestCommitTime(lastCommit) + .build(); + }*/ +} diff --git a/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java b/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java index 3e9a133a2..1e50066a6 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestAbstractHudiTable.java @@ -127,6 +127,7 @@ public abstract class TestAbstractHudiTable throw new UncheckedIOException(ex); } } + // Name of the table protected String tableName; // Base path for the table diff --git a/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java b/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java index ce3b25bda..abbe7fe67 100644 --- a/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java +++ b/xtable-core/src/test/java/org/apache/xtable/TestJavaHudiTable.java @@ -66,6 +66,7 @@ public class TestJavaHudiTable extends TestAbstractHudiTable { private HoodieJavaWriteClient writeClient; private final Configuration conf; + /** * Create a test table instance for general testing. The table is created with the schema defined * in basic_schema.avsc which contains many data types to ensure they are handled correctly. diff --git a/xtable-core/src/test/java/org/apache/xtable/iceberg/StubCatalog.java b/xtable-core/src/test/java/org/apache/xtable/iceberg/StubCatalog.java index a25063c42..9475d5038 100644 --- a/xtable-core/src/test/java/org/apache/xtable/iceberg/StubCatalog.java +++ b/xtable-core/src/test/java/org/apache/xtable/iceberg/StubCatalog.java @@ -36,6 +36,7 @@ public class StubCatalog implements Catalog { public static void registerMock(String catalogName, Catalog catalog) { REGISTERED_MOCKS.put(catalogName, catalog); } + // use a mocked catalog instance to more easily test private Catalog mockedCatalog; diff --git a/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java new file mode 100644 index 000000000..d8ed08072 --- /dev/null +++ b/xtable-core/src/test/java/org/apache/xtable/parquet/TestParquetSchemaExtractor.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.xtable.parquet; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.*; +import org.junit.jupiter.api.Test; +import org.apache.parquet.schema.*; +import org.junit.jupiter.api.Assertions; +import org.apache.xtable.model.schema.InternalSchema; +import org.apache.xtable.model.schema.InternalType; + +// Test class added ONLY to cover main() invocation not covered by application tests. +public class TestParquetSchemaExtractor { + private static final ParquetSchemaExtractor schemaExtractor = + ParquetSchemaExtractor.getInstance(); + + @Test + public void testPrimitiveTypes() { + // InternalSchema primitive1 = + // InternalSchema.builder().name("integer").dataType(InternalType.INT); + /* InternalSchema primitive2 = + InternalSchema.builder().name("string").dataType(InternalType.STRING);*/ + MessageType integerPrimitiveType = null; + + Assertions.assertEquals( + null, schemaExtractor.toInternalSchema(integerPrimitiveType, null, null)); + } + + @Test + public void main() { + testPrimitiveTypes(); + } +} diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java index 349b5ca93..33bf1c80b 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunCatalogSync.java @@ -283,11 +283,13 @@ public static class DatasetConfig { * necessary connection and access details for describing and listing tables */ ExternalCatalogConfig sourceCatalog; + /** * Defines configuration one or more target catalogs, to which XTable will write or update * tables. Unlike the source, these catalogs must be writable */ List targetCatalogs; + /** A list of datasets that specify how a source table maps to one or more target tables. */ List datasets; @@ -300,6 +302,7 @@ public static class DatasetConfig { public static class Dataset { /** Identifies the source table in sourceCatalog. */ SourceTableIdentifier sourceCatalogTableIdentifier; + /** A list of one or more targets that this source table should be written to. */ List targetCatalogTableIdentifiers; } @@ -310,6 +313,7 @@ public static class Dataset { public static class SourceTableIdentifier { /** Specifies the table identifier in the source catalog. */ TableIdentifier tableIdentifier; + /** * (Optional) Provides direct storage details such as a table’s base path (like an S3 * location) and the partition specification. This allows reading from a source even if it is @@ -327,11 +331,13 @@ public static class TargetTableIdentifier { * updated */ String catalogId; + /** * The target table format (e.g., DELTA, HUDI, ICEBERG), specifying how the data will be * stored at the target. */ String tableFormat; + /** Specifies the table identifier in the target catalog. */ TableIdentifier tableIdentifier; } diff --git a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java index 1a7bda874..043570f39 100644 --- a/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java +++ b/xtable-utilities/src/main/java/org/apache/xtable/utilities/RunSync.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.Paths; import java.util.List; import java.util.Map; @@ -53,6 +54,7 @@ import org.apache.xtable.conversion.TargetTable; import org.apache.xtable.hudi.HudiSourceConfig; import org.apache.xtable.iceberg.IcebergCatalogConfig; +import org.apache.xtable.model.config.InputPartitionFields; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.model.sync.SyncMode; import org.apache.xtable.reflection.ReflectionUtils; @@ -114,7 +116,7 @@ public static void main(String[] args) throws IOException { formatter.printHelp("RunSync", OPTIONS); return; } - + InputPartitionFields partitions = null; DatasetConfig datasetConfig = new DatasetConfig(); try (InputStream inputStream = Files.newInputStream(Paths.get(cmd.getOptionValue(DATASET_CONFIG_OPTION)))) { @@ -139,6 +141,11 @@ public static void main(String[] args) throws IOException { sourceFormat, tableFormatConverters.getTableFormatConverters().keySet())); } String sourceProviderClass = sourceConversionConfig.conversionSourceProviderClass; + // get the right config for parquet + if (sourceProviderClass == "Parquet") { + partitions = + getPartitionsFromUserConfiguration(Paths.get(cmd.getOptionValue(DATASET_CONFIG_OPTION))); + } ConversionSourceProvider conversionSourceProvider = ReflectionUtils.createInstanceOfClass(sourceProviderClass); conversionSourceProvider.init(hadoopConf); @@ -186,6 +193,7 @@ public static void main(String[] args) throws IOException { .sourceTable(sourceTable) .targetTables(targetTables) .syncMode(SyncMode.INCREMENTAL) + .partitions(partitions) .build(); try { conversionController.sync(conversionConfig, conversionSourceProvider); @@ -242,6 +250,17 @@ static IcebergCatalogConfig loadIcebergCatalogConfig(byte[] customConfigs) throw : YAML_MAPPER.readValue(customConfigs, IcebergCatalogConfig.class); } + @VisibleForTesting + public static InputPartitionFields getPartitionsFromUserConfiguration(Path configPath) + throws IOException { + InputPartitionFields partitionConfiguration = null; + try (InputStream inputStream = Files.newInputStream(null)) { + ObjectReader objectReader = YAML_MAPPER.readerForUpdating(partitionConfiguration); + objectReader.readValue(inputStream); + return partitionConfiguration; + } + } + @Data public static class DatasetConfig {