Skip to content

First commit on supporting parquet #650

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 49 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
74cbc83
First commit on supporting parquet
Feb 15, 2025
79bd222
catch file not found exception
Feb 17, 2025
2143c99
executed mvn spotless:apply
Feb 17, 2025
4f1ea77
added byte_array data type
Feb 18, 2025
f71610b
added ParquetStatsExtractor
Feb 20, 2025
c57a42f
added InternalDataFile population from parquet metadata
Feb 22, 2025
1557ea3
added col stats for parquet
Feb 22, 2025
24c474a
set todos
Feb 22, 2025
e1a3f35
integrated ParquetPartitionExtractor.java
Feb 23, 2025
fbbd1eb
added partitionValues to StatsExtractor builder
Feb 23, 2025
40c5e67
added the parquet conversion source provider
Feb 23, 2025
ec222de
run mvn spotless:apply
Feb 24, 2025
e0fbca8
edited ParquetSchemaExtractor to include some other LogicalTypes and …
Mar 5, 2025
6e2fc66
ParquetSchemaExtractor few fixes
Mar 5, 2025
b4c49b7
ParquetSchemaExtractor NULL type added
Mar 5, 2025
cac552a
ParquetSchemaExtractor Numeric and time types OK, TODO : Arrays and Maps
Mar 5, 2025
004d763
ParquetSchemaExtractor added groupTypes Map and List: TODO: tests
Mar 5, 2025
4b4593b
added -write parquet- to test Parquet types
Mar 6, 2025
9d56c21
added first test for primitive types
Mar 8, 2025
18ef037
cleanups
Mar 9, 2025
bd11c67
added timestamp metadata (millis, micros, nanos)
Mar 11, 2025
0dbedb0
added else type for each switch case
Mar 11, 2025
0233d54
added string type
Mar 11, 2025
8fc6a95
added Time type
Mar 11, 2025
c88fb25
added metadata for ENUM and FIXED
Mar 11, 2025
6c04cc7
adjusted primitive type detection
Mar 11, 2025
9bdd972
adjusted primitive types for fromInternalSchema sync, TODO: ENUM, LIS…
Mar 11, 2025
924db34
logic for partitionFields (from user configuration) and updated Conve…
Mar 14, 2025
271756e
adjusted data class for reading user config
Mar 14, 2025
f7db318
removed unacessary class
Mar 14, 2025
1323f63
added alternative methods for ParquetSchemaExtractor: to test
Mar 14, 2025
5c87799
fixed small error in the previous commit
Mar 14, 2025
c53b7c5
fixed small errors
Mar 14, 2025
80b9300
partitions are read from config
Mar 14, 2025
c54d038
conversion source and schema extractor link fixed, TODO: split into t…
Mar 14, 2025
c49dbaa
Schema Extractor: List and Map and Fixed are converted Avro Types
Mar 14, 2025
f95f87a
read config source bug fix
Mar 15, 2025
9df1c42
FIXED type conversion ok
Mar 15, 2025
60fdc8a
fixed few compilation errors
Mar 16, 2025
eb7f60f
few other compilation errors fixed
Mar 16, 2025
96e91cd
few other compilation errors fixed 2
Mar 16, 2025
f1b7524
code compiling
Mar 17, 2025
a79af62
cleanups for ParquetStatsExtractor and ParquetSchemaExtractor: compil…
Mar 18, 2025
a27e172
cleanups for Parquet partition methods: compiling but not tested
Mar 18, 2025
c58fe53
cleanups for Parquet Conversion Source and adjust StatsExtractor for …
Mar 18, 2025
429581a
ENUM conversion using avro-parquet: compiling but not tested
Mar 19, 2025
0f3c60b
LIST and RECORD for parquet data conversion
Mar 19, 2025
60dced9
Boolean for parquet data conversion, code compiling, not tested
Mar 20, 2025
1b698bd
StatsExtractor adjusted to remove encodings and add min max vals
Mar 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@
class ExternalTable {
/** The name of the table. */
protected final @NonNull String name;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these changes coming from mvn spotless:apply ? Wondering how latest main branch doesn't reflect these.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

regarding how to get partitionValues, I think it is best to discuss this in our meeting of today, but anyways, can it be asked from the user as YAML configuration?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/** The format of the table (e.g. DELTA, ICEBERG, HUDI) */
protected final @NonNull String formatName;

/** The path to the root of the table or the metadata directory depending on the format */
protected final @NonNull String basePath;

/** Optional namespace for the table */
protected final String[] namespace;

/** The configuration for interacting with the catalog that manages this table */
protected final CatalogConfig catalogConfig;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,18 @@
* table names. Some catalogs may omit the catalog name.
*/
public interface HierarchicalTableIdentifier extends CatalogTableIdentifier {
/** @return the catalog name if present, otherwise null */
/**
* @return the catalog name if present, otherwise null
*/
String getCatalogName();

/** @return the database (or schema) name; required */
/**
* @return the database (or schema) name; required
*/
String getDatabaseName();

/** @return the table name; required */
/**
* @return the table name; required
*/
String getTableName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class ThreePartHierarchicalTableIdentifier implements HierarchicalTableId
* name varies depending on the catalogType.
*/
String catalogName;

/**
* Catalogs have the ability to group tables logically, databaseName is the identifier for such
* logical classification. The alternate names for this field include namespace, schemaName etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,11 @@ public class InternalField {
// The id field for the field. This is used to identify the field in the schema even after
// renames.
Integer fieldId;

// represents the fully qualified path to the field (dot separated)
@Getter(lazy = true)
String path = createPath();

// splits the dot separated path into parts
@Getter(lazy = true)
String[] pathParts = splitPath();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,8 @@ public class InternalDataFile {
@Builder.Default @NonNull List<ColumnStat> columnStats = Collections.emptyList();
// last modified time in millis since epoch
long lastModified;

public static InternalDataFileBuilder builderFrom(InternalDataFile dataFile) {
return dataFile.toBuilder();
}
Comment on lines +53 to +55
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this static method required when there is already a toBuilder method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did I add this method? I use it nowhere in the current version of my code.

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,270 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.parquet;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

APL header is missing. Please run spotless plugin on the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just did, but on the other hand, not sure what is RFC, is there docs that explain what is?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is an example: #634

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@unical1988 Here's the template, I can help if you have more clarifications, we can discuss in the slack.
https://github.com/apache/incubator-xtable/blob/main/rfc/template.md

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


import java.io.IOException;
import java.time.Instant;
import java.util.*;
import java.util.stream.Collectors;

import lombok.Builder;
import lombok.NonNull;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.parquet.Schema;
import org.apache.parquet.SchemaBuilder;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;

import org.apache.xtable.model.*;
import org.apache.xtable.model.schema.InternalPartitionField;
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.storage.*;
import org.apache.xtable.spi.extractor.ConversionSource;

@Builder
public class ParquetConversionSource implements ConversionSource<Long> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few clarification questions to ensure we are on the same page.

  1. Will this source assume partitioning based on directory structure or user can choose partition columns from the parquet file schema ?
  2. If a parquet file is removed from the source root path, will it be handled or ignored ? Using file notifications makes this easier but we can find a way to do this through listing as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I guess getPartitionFromDirectoryStructure() does get the partitions from the directory, but optionally I can add retrieving partitioning from columns (as set by the user).
  2. I can add catching (FileNotFoundException) to handle the case when the source path is not found.


private final String tableName;
private final String basePath;
@NonNull private final Configuration hadoopConf;

@Builder.Default
private static final ParquetSchemaExtractor schemaExtractor =
ParquetSchemaExtractor.getInstance();

@Builder.Default
private static final ParquetMetadataExtractor parquetMetadataExtractor =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class is for stats ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinishjail97 no, but I just added a first method for that class named ParquetStatsExtractor

ParquetMetadataExtractor.getInstance();

@Builder.Default
private static final ParquetPartitionExtractor parquetPartitionExtractor =
ParquetPartitionExtractor.getInstance();

@Builder.Default
private static final ParquetStatsExtractor parquetStatsExtractor =
ParquetStatsExtractor.getInstance();

private Map<String, List<String>> initPartitionInfo() {
return getPartitionFromDirectoryStructure(hadoopConf, basePath, Collections.emptyMap());
}

/**
* To infer schema getting the latest file assumption is that latest file will have new fields
*
* @param modificationTime the commit to consider for reading the table state
* @return
*/
@Override
public InternalTable getTable(Long modificationTime) {

Optional<LocatedFileStatus> latestFile =
getParquetFiles(hadoopConf, basePath)
.max(Comparator.comparing(FileStatus::getModificationTime));

ParquetMetadata parquetMetadata =
parquetMetadataExtractor.readParquetMetadata(hadoopConf, latestFile.get().getPath());
Schema tableSchema =
new org.apache.parquet.parquet.ParquetSchemaConverter()
.convert(parquetMetadataExtractor.getSchema(parquetMetadata));

Set<String> partitionKeys = initPartitionInfo().keySet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be user input.


// merge schema of partition into original as partition is not part of parquet fie
if (!partitionKeys.isEmpty()) {
tableSchema = mergeParquetSchema(tableSchema, partitionKeys);
}
InternalSchema schema = schemaExtractor.toInternalSchema(tableSchema);

List<InternalPartitionField> partitionFields =
partitionKeys.isEmpty()
? Collections.emptyList()
: parquetPartitionExtractor.getInternalPartitionField(partitionKeys, schema);
DataLayoutStrategy dataLayoutStrategy =
partitionFields.isEmpty()
? DataLayoutStrategy.FLAT
: DataLayoutStrategy.HIVE_STYLE_PARTITION;
return InternalTable.builder()
.tableFormat(TableFormat.PARQUET)
.basePath(basePath)
.name(tableName)
.layoutStrategy(dataLayoutStrategy)
.partitioningFields(partitionFields)
.readSchema(schema)
.latestCommitTime(Instant.ofEpochMilli(latestFile.get().getModificationTime()))
.build();
}

/**
* Here to get current snapshot listing all files hence the -1 is being passed
*
* @return
*/
@Override
public InternalSnapshot getCurrentSnapshot() {

List<LocatedFileStatus> latestFile =
getParquetFiles(hadoopConf, basePath).collect(Collectors.toList());
Map<String, List<String>> partitionInfo = initPartitionInfo();
InternalTable table = getTable(-1L);
List<InternalDataFile> internalDataFiles =
latestFile.stream()
.map(
file ->
InternalDataFile.builder()
.physicalPath(file.getPath().toString())
.fileFormat(FileFormat.APACHE_PARQUET)
.fileSizeBytes(file.getLen())
.partitionValues(
parquetPartitionExtractor.getPartitionValue(
basePath,
file.getPath().toString(),
table.getReadSchema(),
partitionInfo))
.lastModified(file.getModificationTime())
.columnStats(
parquetStatsExtractor
.getColumnStatsForaFile(
parquetMetadataExtractor.readParquetMetadata(
hadoopConf, file.getPath().toString()))
.build())
.collect(Collectors.toList()));

return InternalSnapshot.builder()
.table(table)
.partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles))
.build();
}

/**
* Whenever new file is added , condition to get new file is listing files whose modification time
* is greater than previous ysnc
*
* @param modificationTime commit to capture table changes for.
* @return
*/
@Override
public TableChange getTableChangeForCommit(Long modificationTime) {
List<FileStatus> tableChanges =
getParquetFiles(hadoopConf, basePath)
.filter(fileStatus -> fileStatus.getModificationTime() > modificationTime)
.collect(Collectors.toList());
// TODO avoid doing full list of directory to get schema , just argument of modification time
// needs to be tweaked
InternalTable internalTable = getTable(-1L);
Set<InternalDataFile> internalDataFiles = new HashSet<>();
Map<String, List<String>> partitionInfo = initPartitionInfo();
for (FileStatus tableStatus : tableChanges) {
internalDataFiles.add(
InternalDataFile.builder()
.physicalPath(tableStatus.getPath().toString())
.partitionValues(
parquetPartitionExtractor.getPartitionValue(
basePath,
tableStatus.getPath().toString(),
internalTable.getReadSchema(),
partitionInfo))
.lastModified(tableStatus.getModificationTime())
.fileSizeBytes(tableStatus.getLen())
.columnStats(
parquetMetadataExtractor.getColumnStatsForaFile(
hadoopConf, tableStatus, internalTable))
.build());
}

return TableChange.builder()
.tableAsOfChange(internalTable)
.filesDiff(DataFilesDiff.builder().filesAdded(internalDataFiles).build())
.build();
}

@Override
public CommitsBacklog<Long> getCommitsBacklog(
InstantsForIncrementalSync instantsForIncrementalSync) {

List<Long> commitsToProcess =
Collections.singletonList(instantsForIncrementalSync.getLastSyncInstant().toEpochMilli());

return CommitsBacklog.<Long>builder().commitsToProcess(commitsToProcess).build();
}

// TODO Need to understnad how this needs to be implemented should _SUCCESS or .staging dir needs
// to be checked
@Override
public boolean isIncrementalSyncSafeFrom(Instant instant) {
return true;
}

@Override
public void close() throws IOException {}

private Schema mergeParquetSchema(Schema internalSchema, Set<String> parititonFields) {

SchemaBuilder.FieldAssembler<Schema> fieldAssembler =
SchemaBuilder.record(internalSchema.getName()).fields();
for (Schema.Field field : internalSchema.getFields()) {
fieldAssembler = fieldAssembler.name(field.name()).type(field.schema()).noDefault();
}

for (String paritionKey : parititonFields) {
fieldAssembler = fieldAssembler.name(paritionKey).type().stringType().noDefault();
}

return fieldAssembler.endRecord();
}

public Stream<LocatedFileStatus> getParquetFiles(Configuration hadoopConf, String basePath) {
try {
FileSystem fs = FileSystem.get(hadoopConf);
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(basePath), true);
return remoteIteratorToStream(iterator)
.filter(file -> file.getPath().getName().endsWith("parquet"));
} catch (IOException | FileNotFoundException e) {
throw new RuntimeException(e);
}
}

public Map<String, List<String>> getPartitionFromDirectoryStructure(
Configuration hadoopConf, String basePath, Map<String, List<String>> partitionMap) {

try {
FileSystem fs = FileSystem.get(hadoopConf);
FileStatus[] baseFileStatus = fs.listStatus(new Path(basePath));
Map<String, List<String>> currentPartitionMap = new HashMap<>(partitionMap);

for (FileStatus dirStatus : baseFileStatus) {
if (dirStatus.isDirectory()) {
String partitionPath = dirStatus.getPath().getName();
if (partitionPath.contains("=")) {
String[] partitionKeyValue = partitionPath.split("=");
currentPartitionMap
.computeIfAbsent(partitionKeyValue[0], k -> new ArrayList<>())
.add(partitionKeyValue[1]);
getPartitionFromDirectoryStructure(
hadoopConf, dirStatus.getPath().toString(), partitionMap);
}
}
}
return currentPartitionMap;

} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.xtable.parquet;

import org.apache.hadoop.conf.Configuration;

import org.apache.xtable.conversion.ConversionSourceProvider;
import org.apache.xtable.conversion.SourceTable;

/** A concrete implementation of {@link ConversionSourceProvider} for Delta Lake table format. */
public class ParquetConversionSourceProvider extends ConversionSourceProvider<Long> {
@Override
public ParquetConversionSource getConversionSourceInstance(SourceTable sourceTable) {

return ParquetConversionSource.builder()
.tableName(sourceTable.getName())
.basePath(sourceTable.getBasePath())
.hadoopConf(new Configuration())
.build();
}
}
Loading