Skip to content

Commit

Permalink
add a draft of column renaming feature to ParquetRewriter
Browse files Browse the repository at this point in the history
  • Loading branch information
maxim_konstantinov committed Oct 21, 2024
1 parent 42cf31c commit 842dfb7
Show file tree
Hide file tree
Showing 3 changed files with 264 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@
* Please note the schema of all <code>inputFiles</code> must be the same, otherwise the rewrite will fail.
* <p>
* <h2>Applying column transformations</h2>
* Some supported column transformations: pruning, masking, encrypting, changing a codec.
* Some supported column transformations: pruning, masking, renaming, encrypting, changing a codec.
* See {@link RewriteOptions} and {@link RewriteOptions.Builder} for the full list with description.
* <p>
* <h2><i>Joining</i> with extra files with a different schema</h2>
Expand Down Expand Up @@ -145,15 +145,18 @@ public class ParquetRewriter implements Closeable {
private final Queue<TransParquetFileReader> inputFiles = new LinkedList<>();
private final Queue<TransParquetFileReader> inputFilesToJoin = new LinkedList<>();
private final MessageType outSchema;
private final MessageType outSchemaWithRenamedColumns;
// The index cache strategy
private final IndexCache.CacheStrategy indexCacheStrategy;
private final boolean overwriteInputWithJoinColumns;
private final InternalFileEncryptor nullColumnEncryptor;
private final Map<String, String> renamedColumns;

public ParquetRewriter(RewriteOptions options) throws IOException {
this.newCodecName = options.getNewCodecName();
this.indexCacheStrategy = options.getIndexCacheStrategy();
this.overwriteInputWithJoinColumns = options.getOverwriteInputWithJoinColumns();
this.renamedColumns = options.gerRenameColumns();
ParquetConfiguration conf = options.getParquetConfiguration();
OutputFile out = options.getParquetOutputFile();
inputFiles.addAll(getFileReaders(options.getParquetInputFiles(), conf));
Expand All @@ -169,6 +172,8 @@ public ParquetRewriter(RewriteOptions options) throws IOException {
out);

this.outSchema = pruneColumnsInSchema(getSchema(), options.getPruneColumns());
// TODO check a requirement that all renamed column should be present in outSchema
this.outSchemaWithRenamedColumns = getSchemaWithRenamedColumns(this.outSchema);
this.extraMetaData = getExtraMetadata(options);

if (options.getMaskColumns() != null) {
Expand All @@ -186,7 +191,7 @@ public ParquetRewriter(RewriteOptions options) throws IOException {
ParquetFileWriter.Mode writerMode = ParquetFileWriter.Mode.CREATE;
writer = new ParquetFileWriter(
out,
outSchema,
outSchemaWithRenamedColumns != null ? outSchemaWithRenamedColumns : outSchema,
writerMode,
DEFAULT_BLOCK_SIZE,
MAX_PADDING_SIZE_DEFAULT,
Expand Down Expand Up @@ -222,6 +227,7 @@ public ParquetRewriter(
MaskMode maskMode) {
this.writer = writer;
this.outSchema = outSchema;
this.outSchemaWithRenamedColumns = null;
this.newCodecName = codecName;
extraMetaData = new HashMap<>(meta.getFileMetaData().getKeyValueMetaData());
extraMetaData.put(
Expand All @@ -239,6 +245,7 @@ public ParquetRewriter(
this.indexCacheStrategy = IndexCache.CacheStrategy.NONE;
this.overwriteInputWithJoinColumns = false;
this.nullColumnEncryptor = null;
this.renamedColumns = new HashMap<>();
}

private MessageType getSchema() {
Expand Down Expand Up @@ -266,6 +273,27 @@ private MessageType getSchema() {
}
}

private MessageType getSchemaWithRenamedColumns(MessageType schema) {
List<Type> fields = schema.getFields().stream()
.map(type -> {
if (renamedColumns == null || !renamedColumns.containsKey(type.getName())) {
return type;
} else if (type.isPrimitive()) {
return new PrimitiveType(
type.getRepetition(),
type.asPrimitiveType().getPrimitiveTypeName(),
renamedColumns.get(type.getName()));
} else {
return new GroupType(
type.getRepetition(),
renamedColumns.get(type.getName()),
type.asGroupType().getFields());
}
})
.collect(Collectors.toList());
return new MessageType(schema.getName(), fields);
}

private Map<String, String> getExtraMetadata(RewriteOptions options) {
List<TransParquetFileReader> allFiles;
if (options.getIgnoreJoinFilesMetadata()) {
Expand Down Expand Up @@ -421,6 +449,27 @@ public void processBlocks() throws IOException {
if (readerToJoin != null) readerToJoin.close();
}

private ColumnPath renameFieldsInPath(ColumnPath path) {
if (renamedColumns == null) {
return path;
} else {
String[] pathArray = path.toArray();
pathArray[0] = renamedColumns.getOrDefault(pathArray[0], pathArray[0]);
return ColumnPath.get(pathArray);
}
}

private PrimitiveType renameNameInType(PrimitiveType type) {
if (renamedColumns == null) {
return type;
} else {
return new PrimitiveType(
type.getRepetition(),
type.asPrimitiveType().getPrimitiveTypeName(),
renamedColumns.getOrDefault(type.getName(), type.getName()));
}
}

private void processBlock(
TransParquetFileReader reader,
int blockIdx,
Expand All @@ -431,7 +480,27 @@ private void processBlock(
if (chunk.isEncrypted()) {
throw new IOException("Column " + chunk.getPath().toDotString() + " is already encrypted");
}
ColumnDescriptor descriptor = outSchema.getColumns().get(outColumnIdx);

ColumnChunkMetaData chunkColumnsRenamed = chunk;
if (renamedColumns != null && !renamedColumns.isEmpty()) {
chunkColumnsRenamed = ColumnChunkMetaData.get(
renameFieldsInPath(chunk.getPath()),
renameNameInType(chunk.getPrimitiveType()),
chunk.getCodec(),
chunk.getEncodingStats(),
chunk.getEncodings(),
chunk.getStatistics(),
chunk.getFirstDataPageOffset(),
chunk.getDictionaryPageOffset(),
chunk.getValueCount(),
chunk.getTotalSize(),
chunk.getTotalUncompressedSize(),
chunk.getSizeStatistics());
}

ColumnDescriptor descriptorOriginal = outSchema.getColumns().get(outColumnIdx);
ColumnDescriptor descriptorRenamed =
outSchemaWithRenamedColumns.getColumns().get(outColumnIdx);
BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(blockIdx);
String originalCreatedBy = reader.getFileMetaData().getCreatedBy();

Expand All @@ -443,13 +512,21 @@ private void processBlock(
// Mask column and compress it again.
MaskMode maskMode = maskColumns.get(chunk.getPath());
if (maskMode.equals(MaskMode.NULLIFY)) {
Type.Repetition repetition = descriptor.getPrimitiveType().getRepetition();
Type.Repetition repetition =
descriptorOriginal.getPrimitiveType().getRepetition();
if (repetition.equals(Type.Repetition.REQUIRED)) {
throw new IOException(
"Required column [" + descriptor.getPrimitiveType().getName() + "] cannot be nullified");
throw new IOException("Required column ["
+ descriptorOriginal.getPrimitiveType().getName() + "] cannot be nullified");
}
nullifyColumn(
reader, blockIdx, descriptor, chunk, writer, newCodecName, encryptColumn, originalCreatedBy);
reader,
blockIdx,
descriptorOriginal,
chunk,
writer,
newCodecName,
encryptColumn,
originalCreatedBy);
} else {
throw new UnsupportedOperationException("Only nullify is supported for now");
}
Expand All @@ -462,7 +539,7 @@ private void processBlock(
}

// Translate compression and/or encryption
writer.startColumn(descriptor, chunk.getValueCount(), newCodecName);
writer.startColumn(descriptorRenamed, chunk.getValueCount(), newCodecName);
processChunk(
reader,
blockMetaData.getRowCount(),
Expand All @@ -480,7 +557,8 @@ private void processBlock(
BloomFilter bloomFilter = indexCache.getBloomFilter(chunk);
ColumnIndex columnIndex = indexCache.getColumnIndex(chunk);
OffsetIndex offsetIndex = indexCache.getOffsetIndex(chunk);
writer.appendColumnChunk(descriptor, reader.getStream(), chunk, bloomFilter, columnIndex, offsetIndex);
writer.appendColumnChunk(
descriptorRenamed, reader.getStream(), chunkColumnsRenamed, bloomFilter, columnIndex, offsetIndex);
}
}

Expand Down Expand Up @@ -522,7 +600,7 @@ private void processChunk(
}

if (bloomFilter != null) {
writer.addBloomFilter(chunk.getPath().toDotString(), bloomFilter);
writer.addBloomFilter(renameFieldsInPath(chunk.getPath()).toDotString(), bloomFilter);
}

reader.setStreamPosition(chunk.getStartingPos());
Expand Down Expand Up @@ -580,7 +658,7 @@ private void processChunk(
dataPageAAD);
statistics = convertStatistics(
originalCreatedBy,
chunk.getPrimitiveType(),
renameNameInType(chunk.getPrimitiveType()),
headerV1.getStatistics(),
columnIndex,
pageOrdinal,
Expand Down Expand Up @@ -648,7 +726,7 @@ private void processChunk(
dataPageAAD);
statistics = convertStatistics(
originalCreatedBy,
chunk.getPrimitiveType(),
renameNameInType(chunk.getPrimitiveType()),
headerV2.getStatistics(),
columnIndex,
pageOrdinal,
Expand Down Expand Up @@ -887,7 +965,7 @@ private void nullifyColumn(
CompressionCodecFactory.BytesInputCompressor compressor = codecFactory.getCompressor(newCodecName);

// Create new schema that only has the current column
MessageType newSchema = newSchema(outSchema, descriptor);
MessageType newSchema = getSchemaWithRenamedColumns(newSchema(outSchema, descriptor));
ColumnChunkPageWriteStore cPageStore = new ColumnChunkPageWriteStore(
compressor,
newSchema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class RewriteOptions {
private final List<String> pruneColumns;
private final CompressionCodecName newCodecName;
private final Map<String, MaskMode> maskColumns;
private final Map<String, String> renameColumns;
private final List<String> encryptColumns;
private final FileEncryptionProperties fileEncryptionProperties;
private final IndexCache.CacheStrategy indexCacheStrategy;
Expand All @@ -63,6 +64,7 @@ private RewriteOptions(
List<String> pruneColumns,
CompressionCodecName newCodecName,
Map<String, MaskMode> maskColumns,
Map<String, String> renameColumns,
List<String> encryptColumns,
FileEncryptionProperties fileEncryptionProperties,
IndexCache.CacheStrategy indexCacheStrategy,
Expand All @@ -75,6 +77,7 @@ private RewriteOptions(
this.pruneColumns = pruneColumns;
this.newCodecName = newCodecName;
this.maskColumns = maskColumns;
this.renameColumns = renameColumns;
this.encryptColumns = encryptColumns;
this.fileEncryptionProperties = fileEncryptionProperties;
this.indexCacheStrategy = indexCacheStrategy;
Expand Down Expand Up @@ -192,6 +195,10 @@ public Map<String, MaskMode> getMaskColumns() {
return maskColumns;
}

public Map<String, String> gerRenameColumns() {
return renameColumns;
}

public List<String> getEncryptColumns() {
return encryptColumns;
}
Expand Down Expand Up @@ -221,6 +228,7 @@ public static class Builder {
private List<String> pruneColumns;
private CompressionCodecName newCodecName;
private Map<String, MaskMode> maskColumns;
private Map<String, String> renameColumns;
private List<String> encryptColumns;
private FileEncryptionProperties fileEncryptionProperties;
private IndexCache.CacheStrategy indexCacheStrategy = IndexCache.CacheStrategy.NONE;
Expand Down Expand Up @@ -432,6 +440,19 @@ public Builder mask(Map<String, MaskMode> maskColumns) {
return this;
}

/**
* Set the columns to be renamed.
* <p>
* Note that nested columns can't be renamed, in case of GroupType column only top level column can be renamed.
*
* @param renameColumns map where keys are original names and values are new names
* @return self
*/
public Builder renameColumns(Map<String, String> renameColumns) {
this.renameColumns = renameColumns;
return this;
}

/**
* Set the columns to encrypt.
* <p>
Expand Down Expand Up @@ -561,13 +582,29 @@ public RewriteOptions build() {
!maskColumns.containsKey(pruneColumn), "Cannot prune and mask same column");
}
}

if (encryptColumns != null) {
for (String pruneColumn : pruneColumns) {
Preconditions.checkArgument(
!encryptColumns.contains(pruneColumn), "Cannot prune and encrypt same column");
}
}
if (renameColumns != null) {
for (Map.Entry<String, String> entry : renameColumns.entrySet()) {
Preconditions.checkArgument(
!encryptColumns.contains(entry.getKey()), "Cannot prune and rename same column");
}
}
}

if (renameColumns != null && !renameColumns.isEmpty()) {
for (Map.Entry<String, String> entry : renameColumns.entrySet()) {
Preconditions.checkArgument(
entry.getValue() != null && !entry.getValue().trim().isEmpty(),
"Renamed column target name can't be empty");
Preconditions.checkArgument(
!entry.getKey().contains(".") && !entry.getValue().contains("."),
"Renamed column name can't be nested, in case of GroupType column only a top level column can be renamed");
}
}

if (encryptColumns != null && !encryptColumns.isEmpty()) {
Expand All @@ -590,6 +627,7 @@ public RewriteOptions build() {
pruneColumns,
newCodecName,
maskColumns,
renameColumns,
encryptColumns,
fileEncryptionProperties,
indexCacheStrategy,
Expand Down
Loading

0 comments on commit 842dfb7

Please sign in to comment.