-
Notifications
You must be signed in to change notification settings - Fork 178
First commit on supporting parquet #650
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 19 commits
74cbc83
79bd222
2143c99
4f1ea77
f71610b
c57a42f
1557ea3
24c474a
e1a3f35
fbbd1eb
40c5e67
ec222de
e0fbca8
6e2fc66
b4c49b7
cac552a
004d763
4b4593b
9d56c21
18ef037
bd11c67
0dbedb0
0233d54
8fc6a95
c88fb25
6c04cc7
9bdd972
924db34
271756e
f7db318
1323f63
5c87799
c53b7c5
80b9300
c54d038
c49dbaa
f95f87a
9df1c42
60fdc8a
eb7f60f
96e91cd
f1b7524
a79af62
a27e172
c58fe53
429581a
0f3c60b
60dced9
1b698bd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,4 +49,8 @@ public class InternalDataFile { | |
@Builder.Default @NonNull List<ColumnStat> columnStats = Collections.emptyList(); | ||
// last modified time in millis since epoch | ||
long lastModified; | ||
|
||
public static InternalDataFileBuilder builderFrom(InternalDataFile dataFile) { | ||
return dataFile.toBuilder(); | ||
} | ||
Comment on lines
+53
to
+55
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this static method required when there is already a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did I add this method? I use it nowhere in the current version of my code. |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,270 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.xtable.parquet; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. APL header is missing. Please run spotless plugin on the code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just did, but on the other hand, not sure what is RFC, is there docs that explain what is? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here is an example: #634 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @unical1988 Here's the template, I can help if you have more clarifications, we can discuss in the slack. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
import java.io.IOException; | ||
import java.time.Instant; | ||
import java.util.*; | ||
import java.util.stream.Collectors; | ||
|
||
import lombok.Builder; | ||
import lombok.NonNull; | ||
|
||
import org.apache.hadoop.conf.Configuration; | ||
import org.apache.hadoop.fs.*; | ||
import org.apache.parquet.Schema; | ||
import org.apache.parquet.SchemaBuilder; | ||
import org.apache.parquet.hadoop.metadata.ParquetMetadata; | ||
|
||
import org.apache.xtable.model.*; | ||
import org.apache.xtable.model.schema.InternalPartitionField; | ||
import org.apache.xtable.model.schema.InternalSchema; | ||
import org.apache.xtable.model.storage.*; | ||
import org.apache.xtable.spi.extractor.ConversionSource; | ||
|
||
@Builder | ||
public class ParquetConversionSource implements ConversionSource<Long> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Few clarification questions to ensure we are on the same page.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
private final String tableName; | ||
private final String basePath; | ||
@NonNull private final Configuration hadoopConf; | ||
|
||
@Builder.Default | ||
private static final ParquetSchemaExtractor schemaExtractor = | ||
ParquetSchemaExtractor.getInstance(); | ||
|
||
@Builder.Default | ||
private static final ParquetMetadataExtractor parquetMetadataExtractor = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This class is for stats ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @vinishjail97 no, but I just added a first method for that class named |
||
ParquetMetadataExtractor.getInstance(); | ||
|
||
@Builder.Default | ||
private static final ParquetPartitionExtractor parquetPartitionExtractor = | ||
ParquetPartitionExtractor.getInstance(); | ||
|
||
@Builder.Default | ||
private static final ParquetStatsExtractor parquetStatsExtractor = | ||
ParquetStatsExtractor.getInstance(); | ||
|
||
private Map<String, List<String>> initPartitionInfo() { | ||
return getPartitionFromDirectoryStructure(hadoopConf, basePath, Collections.emptyMap()); | ||
} | ||
|
||
/** | ||
* To infer schema getting the latest file assumption is that latest file will have new fields | ||
* | ||
* @param modificationTime the commit to consider for reading the table state | ||
* @return | ||
*/ | ||
@Override | ||
public InternalTable getTable(Long modificationTime) { | ||
|
||
Optional<LocatedFileStatus> latestFile = | ||
getParquetFiles(hadoopConf, basePath) | ||
.max(Comparator.comparing(FileStatus::getModificationTime)); | ||
|
||
ParquetMetadata parquetMetadata = | ||
parquetMetadataExtractor.readParquetMetadata(hadoopConf, latestFile.get().getPath()); | ||
Schema tableSchema = | ||
new org.apache.parquet.parquet.ParquetSchemaConverter() | ||
.convert(parquetMetadataExtractor.getSchema(parquetMetadata)); | ||
|
||
Set<String> partitionKeys = initPartitionInfo().keySet(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will be user input. |
||
|
||
// merge schema of partition into original as partition is not part of parquet fie | ||
if (!partitionKeys.isEmpty()) { | ||
tableSchema = mergeParquetSchema(tableSchema, partitionKeys); | ||
} | ||
InternalSchema schema = schemaExtractor.toInternalSchema(tableSchema); | ||
|
||
List<InternalPartitionField> partitionFields = | ||
partitionKeys.isEmpty() | ||
? Collections.emptyList() | ||
: parquetPartitionExtractor.getInternalPartitionField(partitionKeys, schema); | ||
DataLayoutStrategy dataLayoutStrategy = | ||
partitionFields.isEmpty() | ||
? DataLayoutStrategy.FLAT | ||
: DataLayoutStrategy.HIVE_STYLE_PARTITION; | ||
return InternalTable.builder() | ||
.tableFormat(TableFormat.PARQUET) | ||
.basePath(basePath) | ||
.name(tableName) | ||
.layoutStrategy(dataLayoutStrategy) | ||
.partitioningFields(partitionFields) | ||
.readSchema(schema) | ||
.latestCommitTime(Instant.ofEpochMilli(latestFile.get().getModificationTime())) | ||
.build(); | ||
} | ||
|
||
/** | ||
* Here to get current snapshot listing all files hence the -1 is being passed | ||
* | ||
* @return | ||
*/ | ||
@Override | ||
public InternalSnapshot getCurrentSnapshot() { | ||
|
||
List<LocatedFileStatus> latestFile = | ||
getParquetFiles(hadoopConf, basePath).collect(Collectors.toList()); | ||
Map<String, List<String>> partitionInfo = initPartitionInfo(); | ||
InternalTable table = getTable(-1L); | ||
List<InternalDataFile> internalDataFiles = | ||
latestFile.stream() | ||
.map( | ||
file -> | ||
InternalDataFile.builder() | ||
.physicalPath(file.getPath().toString()) | ||
.fileFormat(FileFormat.APACHE_PARQUET) | ||
.fileSizeBytes(file.getLen()) | ||
.partitionValues( | ||
parquetPartitionExtractor.getPartitionValue( | ||
basePath, | ||
file.getPath().toString(), | ||
table.getReadSchema(), | ||
partitionInfo)) | ||
.lastModified(file.getModificationTime()) | ||
.columnStats( | ||
parquetStatsExtractor | ||
.getColumnStatsForaFile( | ||
parquetMetadataExtractor.readParquetMetadata( | ||
hadoopConf, file.getPath().toString())) | ||
.build()) | ||
.collect(Collectors.toList())); | ||
|
||
return InternalSnapshot.builder() | ||
.table(table) | ||
.partitionedDataFiles(PartitionFileGroup.fromFiles(internalDataFiles)) | ||
.build(); | ||
} | ||
|
||
/** | ||
* Whenever new file is added , condition to get new file is listing files whose modification time | ||
* is greater than previous ysnc | ||
* | ||
* @param modificationTime commit to capture table changes for. | ||
* @return | ||
*/ | ||
@Override | ||
public TableChange getTableChangeForCommit(Long modificationTime) { | ||
List<FileStatus> tableChanges = | ||
getParquetFiles(hadoopConf, basePath) | ||
.filter(fileStatus -> fileStatus.getModificationTime() > modificationTime) | ||
.collect(Collectors.toList()); | ||
// TODO avoid doing full list of directory to get schema , just argument of modification time | ||
// needs to be tweaked | ||
InternalTable internalTable = getTable(-1L); | ||
Set<InternalDataFile> internalDataFiles = new HashSet<>(); | ||
Map<String, List<String>> partitionInfo = initPartitionInfo(); | ||
for (FileStatus tableStatus : tableChanges) { | ||
internalDataFiles.add( | ||
InternalDataFile.builder() | ||
.physicalPath(tableStatus.getPath().toString()) | ||
.partitionValues( | ||
parquetPartitionExtractor.getPartitionValue( | ||
basePath, | ||
tableStatus.getPath().toString(), | ||
internalTable.getReadSchema(), | ||
partitionInfo)) | ||
.lastModified(tableStatus.getModificationTime()) | ||
.fileSizeBytes(tableStatus.getLen()) | ||
.columnStats( | ||
parquetMetadataExtractor.getColumnStatsForaFile( | ||
hadoopConf, tableStatus, internalTable)) | ||
.build()); | ||
} | ||
|
||
return TableChange.builder() | ||
.tableAsOfChange(internalTable) | ||
.filesDiff(DataFilesDiff.builder().filesAdded(internalDataFiles).build()) | ||
.build(); | ||
} | ||
|
||
@Override | ||
public CommitsBacklog<Long> getCommitsBacklog( | ||
InstantsForIncrementalSync instantsForIncrementalSync) { | ||
|
||
List<Long> commitsToProcess = | ||
Collections.singletonList(instantsForIncrementalSync.getLastSyncInstant().toEpochMilli()); | ||
|
||
return CommitsBacklog.<Long>builder().commitsToProcess(commitsToProcess).build(); | ||
} | ||
|
||
// TODO Need to understnad how this needs to be implemented should _SUCCESS or .staging dir needs | ||
// to be checked | ||
@Override | ||
public boolean isIncrementalSyncSafeFrom(Instant instant) { | ||
return true; | ||
} | ||
|
||
@Override | ||
public void close() throws IOException {} | ||
|
||
private Schema mergeParquetSchema(Schema internalSchema, Set<String> parititonFields) { | ||
|
||
SchemaBuilder.FieldAssembler<Schema> fieldAssembler = | ||
SchemaBuilder.record(internalSchema.getName()).fields(); | ||
for (Schema.Field field : internalSchema.getFields()) { | ||
fieldAssembler = fieldAssembler.name(field.name()).type(field.schema()).noDefault(); | ||
} | ||
|
||
for (String paritionKey : parititonFields) { | ||
fieldAssembler = fieldAssembler.name(paritionKey).type().stringType().noDefault(); | ||
} | ||
|
||
return fieldAssembler.endRecord(); | ||
} | ||
|
||
public Stream<LocatedFileStatus> getParquetFiles(Configuration hadoopConf, String basePath) { | ||
try { | ||
FileSystem fs = FileSystem.get(hadoopConf); | ||
RemoteIterator<LocatedFileStatus> iterator = fs.listFiles(new Path(basePath), true); | ||
return remoteIteratorToStream(iterator) | ||
.filter(file -> file.getPath().getName().endsWith("parquet")); | ||
} catch (IOException | FileNotFoundException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
|
||
public Map<String, List<String>> getPartitionFromDirectoryStructure( | ||
Configuration hadoopConf, String basePath, Map<String, List<String>> partitionMap) { | ||
|
||
try { | ||
FileSystem fs = FileSystem.get(hadoopConf); | ||
FileStatus[] baseFileStatus = fs.listStatus(new Path(basePath)); | ||
Map<String, List<String>> currentPartitionMap = new HashMap<>(partitionMap); | ||
|
||
for (FileStatus dirStatus : baseFileStatus) { | ||
if (dirStatus.isDirectory()) { | ||
String partitionPath = dirStatus.getPath().getName(); | ||
if (partitionPath.contains("=")) { | ||
String[] partitionKeyValue = partitionPath.split("="); | ||
currentPartitionMap | ||
.computeIfAbsent(partitionKeyValue[0], k -> new ArrayList<>()) | ||
.add(partitionKeyValue[1]); | ||
getPartitionFromDirectoryStructure( | ||
hadoopConf, dirStatus.getPath().toString(), partitionMap); | ||
} | ||
} | ||
} | ||
return currentPartitionMap; | ||
|
||
} catch (IOException e) { | ||
throw new RuntimeException(e); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.xtable.parquet; | ||
|
||
import org.apache.hadoop.conf.Configuration; | ||
|
||
import org.apache.xtable.conversion.ConversionSourceProvider; | ||
import org.apache.xtable.conversion.SourceTable; | ||
|
||
/** A concrete implementation of {@link ConversionSourceProvider} for Delta Lake table format. */ | ||
public class ParquetConversionSourceProvider extends ConversionSourceProvider<Long> { | ||
@Override | ||
public ParquetConversionSource getConversionSourceInstance(SourceTable sourceTable) { | ||
|
||
return ParquetConversionSource.builder() | ||
.tableName(sourceTable.getName()) | ||
.basePath(sourceTable.getBasePath()) | ||
.hadoopConf(new Configuration()) | ||
.build(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these changes coming from
mvn spotless:apply
? Wondering how latest main branch doesn't reflect these.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/apache/incubator-xtable/actions/runs/13485330237/job/37693458127?pr=650
The build failed because of spotless
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
regarding how to get partitionValues, I think it is best to discuss this in our meeting of today, but anyways, can it be asked from the user as YAML configuration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes we can use YAML configuration inputs.
https://github.com/apache/incubator-xtable/blob/main/README.md#running-the-bundled-jar