Skip to content

Commit 4b3ea4b

Browse files
authored
feat: DH-18399: Add ParquetColumnResolver (#6558)
1 parent 49bfd12 commit 4b3ea4b

22 files changed

+1354
-267
lines changed

Util/src/main/java/io/deephaven/util/annotations/InternalUseOnly.java

+4-3
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,11 @@
99
import java.lang.annotation.Target;
1010

1111
/**
12-
* Indicates that a particular method is for internal use only and should not be used by client code. It is subject to
13-
* change/removal at any time.
12+
* Indicates that a particular {@link ElementType#METHOD method}, {@link ElementType#CONSTRUCTOR constructor},
13+
* {@link ElementType#TYPE type}, or {@link ElementType#PACKAGE package} is for internal use only and should not be used
14+
* by client code. It is subject to change/removal at any time.
1415
*/
15-
@Target({ElementType.METHOD, ElementType.CONSTRUCTOR, ElementType.TYPE})
16+
@Target({ElementType.METHOD, ElementType.CONSTRUCTOR, ElementType.TYPE, ElementType.PACKAGE})
1617
@Inherited
1718
@Documented
1819
public @interface InternalUseOnly {

extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java

+27-27
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@
3434
import org.apache.iceberg.catalog.Namespace;
3535
import org.apache.iceberg.catalog.TableIdentifier;
3636
import org.apache.iceberg.types.Types;
37-
import org.apache.parquet.column.ColumnDescriptor;
3837
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
38+
import org.apache.parquet.schema.LogicalTypeAnnotation;
39+
import org.apache.parquet.schema.MessageType;
3940
import org.jetbrains.annotations.Nullable;
4041
import org.junit.jupiter.api.AfterEach;
4142
import org.junit.jupiter.api.BeforeEach;
@@ -53,6 +54,11 @@
5354
import java.util.List;
5455
import java.util.stream.Collectors;
5556
import static io.deephaven.engine.testutil.TstUtils.assertTableEquals;
57+
import static org.apache.parquet.schema.LogicalTypeAnnotation.intType;
58+
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
59+
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
60+
import static org.apache.parquet.schema.Types.buildMessage;
61+
import static org.apache.parquet.schema.Types.optional;
5662
import static org.assertj.core.api.Assertions.assertThat;
5763
import static org.assertj.core.api.Assertions.failBecauseExceptionWasNotThrown;
5864

@@ -416,8 +422,12 @@ void testColumnRenameWhileWriting() throws URISyntaxException {
416422
{
417423
final List<String> parquetFiles = getAllParquetFilesFromDataFiles(tableIdentifier);
418424
assertThat(parquetFiles).hasSize(1);
419-
verifyFieldIdsFromParquetFile(parquetFiles.get(0), originalDefinition.getColumnNames(),
420-
nameToFieldIdFromSchema);
425+
final MessageType expectedSchema = buildMessage()
426+
.addFields(
427+
optional(INT32).id(1).as(intType(32, true)).named("intCol"),
428+
optional(DOUBLE).id(2).named("doubleCol"))
429+
.named("root");
430+
verifySchema(parquetFiles.get(0), expectedSchema);
421431
}
422432

423433
final Table moreData = TableTools.emptyTable(5)
@@ -442,10 +452,18 @@ void testColumnRenameWhileWriting() throws URISyntaxException {
442452

443453
final List<String> parquetFiles = getAllParquetFilesFromDataFiles(tableIdentifier);
444454
assertThat(parquetFiles).hasSize(2);
445-
verifyFieldIdsFromParquetFile(parquetFiles.get(0), moreData.getDefinition().getColumnNames(),
446-
newNameToFieldId);
447-
verifyFieldIdsFromParquetFile(parquetFiles.get(1), originalDefinition.getColumnNames(),
448-
nameToFieldIdFromSchema);
455+
final MessageType expectedSchema0 = buildMessage()
456+
.addFields(
457+
optional(INT32).id(1).as(intType(32, true)).named("newIntCol"),
458+
optional(DOUBLE).id(2).named("newDoubleCol"))
459+
.named("root");
460+
final MessageType expectedSchema1 = buildMessage()
461+
.addFields(
462+
optional(INT32).id(1).as(intType(32, true)).named("intCol"),
463+
optional(DOUBLE).id(2).named("doubleCol"))
464+
.named("root");
465+
verifySchema(parquetFiles.get(0), expectedSchema0);
466+
verifySchema(parquetFiles.get(1), expectedSchema1);
449467
}
450468

451469
// TODO: This is failing because we don't map columns based on the column ID when reading. Uncomment this
@@ -455,31 +473,13 @@ void testColumnRenameWhileWriting() throws URISyntaxException {
455473
// moreData.renameColumns("intCol = newIntCol", "doubleCol = newDoubleCol")), fromIceberg);
456474
}
457475

458-
/**
459-
* Verify that the schema of the parquet file read from the provided path has the provided column and corresponding
460-
* field IDs.
461-
*/
462-
private void verifyFieldIdsFromParquetFile(
463-
final String path,
464-
final List<String> columnNames,
465-
final Map<String, Integer> nameToFieldId) throws URISyntaxException {
476+
private void verifySchema(String path, MessageType expectedSchema) throws URISyntaxException {
466477
final ParquetMetadata metadata =
467478
new ParquetTableLocationKey(new URI(path), 0, null, ParquetInstructions.builder()
468479
.setSpecialInstructions(dataInstructions())
469480
.build())
470481
.getMetadata();
471-
final List<ColumnDescriptor> columnsMetadata = metadata.getFileMetaData().getSchema().getColumns();
472-
473-
final int numColumns = columnNames.size();
474-
for (int colIdx = 0; colIdx < numColumns; colIdx++) {
475-
final String columnName = columnNames.get(colIdx);
476-
final String columnNameFromParquetFile = columnsMetadata.get(colIdx).getPath()[0];
477-
assertThat(columnName).isEqualTo(columnNameFromParquetFile);
478-
479-
final int expectedFieldId = nameToFieldId.get(columnName);
480-
final int fieldIdFromParquetFile = columnsMetadata.get(colIdx).getPrimitiveType().getId().intValue();
481-
assertThat(fieldIdFromParquetFile).isEqualTo(expectedFieldId);
482-
}
482+
assertThat(metadata.getFileMetaData().getSchema()).isEqualTo(expectedSchema);
483483
}
484484

485485
/**

extensions/parquet/base/build.gradle

+12-1
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,16 @@ dependencies {
2222
implementation libs.guava
2323

2424
compileOnly libs.jetbrains.annotations
25-
testImplementation libs.junit4
25+
26+
testImplementation libs.assertj
27+
28+
testImplementation platform(libs.junit.bom)
29+
testImplementation libs.junit.jupiter
30+
testRuntimeOnly libs.junit.jupiter.engine
31+
testRuntimeOnly libs.junit.platform.launcher
32+
}
33+
34+
tasks.withType(Test).configureEach {
35+
useJUnitPlatform {
36+
}
2637
}

extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnChunkReaderImpl.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
package io.deephaven.parquet.base;
55

66
import io.deephaven.UncheckedDeephavenException;
7+
import io.deephaven.parquet.impl.ParquetSchemaUtil;
78
import io.deephaven.util.channel.SeekableChannelContext;
89
import io.deephaven.util.channel.SeekableChannelsProvider;
910
import io.deephaven.parquet.compress.CompressorAdapter;
@@ -68,8 +69,8 @@ final class ColumnChunkReaderImpl implements ColumnChunkReader {
6869
this.columnName = columnName;
6970
this.channelsProvider = channelsProvider;
7071
this.columnChunk = columnChunk;
71-
this.path = type
72-
.getColumnDescription(columnChunk.meta_data.getPath_in_schema().toArray(new String[0]));
72+
this.path =
73+
ParquetSchemaUtil.columnDescriptor(type, columnChunk.meta_data.getPath_in_schema()).orElseThrow();
7374
if (columnChunk.getMeta_data().isSetCodec()) {
7475
decompressor = DeephavenCompressorAdapterFactory.getInstance()
7576
.getByName(columnChunk.getMeta_data().getCodec().name());

extensions/parquet/base/src/main/java/io/deephaven/parquet/base/ColumnWriterImpl.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.nio.channels.Channels;
3333
import java.nio.channels.WritableByteChannel;
3434
import java.util.EnumSet;
35+
import java.util.Objects;
3536
import java.util.Set;
3637

3738
import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
@@ -76,11 +77,11 @@ final class ColumnWriterImpl implements ColumnWriter {
7677
final CompressorAdapter compressorAdapter,
7778
final int targetPageSize,
7879
final ByteBufferAllocator allocator) {
79-
this.countingOutput = countingOutput;
80-
this.column = column;
81-
this.compressorAdapter = compressorAdapter;
80+
this.countingOutput = Objects.requireNonNull(countingOutput);
81+
this.column = Objects.requireNonNull(column);
82+
this.compressorAdapter = Objects.requireNonNull(compressorAdapter);
8283
this.targetPageSize = targetPageSize;
83-
this.allocator = allocator;
84+
this.allocator = Objects.requireNonNull(allocator);
8485
dlEncoder = column.getMaxDefinitionLevel() == 0 ? null
8586
: new RunLengthBitPackingHybridEncoder(
8687
getWidthFromMaxInt(column.getMaxDefinitionLevel()), MIN_SLAB_SIZE, targetPageSize, allocator);

extensions/parquet/base/src/main/java/io/deephaven/parquet/base/RowGroupWriterImpl.java

+13-11
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55

66
import com.google.common.io.CountingOutputStream;
77
import io.deephaven.parquet.compress.CompressorAdapter;
8+
import io.deephaven.parquet.impl.ParquetSchemaUtil;
89
import org.apache.parquet.bytes.ByteBufferAllocator;
910
import org.apache.parquet.hadoop.metadata.BlockMetaData;
1011
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -16,10 +17,11 @@
1617
import java.util.ArrayList;
1718
import java.util.Arrays;
1819
import java.util.List;
20+
import java.util.Objects;
1921

2022
final class RowGroupWriterImpl implements RowGroupWriter {
2123
private final CountingOutputStream countingOutput;
22-
private final MessageType type;
24+
private final MessageType schema;
2325
private final int targetPageSize;
2426
private final ByteBufferAllocator allocator;
2527
private ColumnWriterImpl activeWriter;
@@ -28,33 +30,33 @@ final class RowGroupWriterImpl implements RowGroupWriter {
2830
private final CompressorAdapter compressorAdapter;
2931

3032
RowGroupWriterImpl(CountingOutputStream countingOutput,
31-
MessageType type,
33+
MessageType schema,
3234
int targetPageSize,
3335
ByteBufferAllocator allocator,
3436
CompressorAdapter compressorAdapter) {
35-
this(countingOutput, type, targetPageSize, allocator, new BlockMetaData(), compressorAdapter);
37+
this(countingOutput, schema, targetPageSize, allocator, new BlockMetaData(), compressorAdapter);
3638
}
3739

3840

3941
private RowGroupWriterImpl(CountingOutputStream countingOutput,
40-
MessageType type,
42+
MessageType schema,
4143
int targetPageSize,
4244
ByteBufferAllocator allocator,
4345
BlockMetaData blockMetaData,
4446
CompressorAdapter compressorAdapter) {
45-
this.countingOutput = countingOutput;
46-
this.type = type;
47+
this.countingOutput = Objects.requireNonNull(countingOutput);
48+
this.schema = Objects.requireNonNull(schema);
4749
this.targetPageSize = targetPageSize;
48-
this.allocator = allocator;
49-
this.blockMetaData = blockMetaData;
50-
this.compressorAdapter = compressorAdapter;
50+
this.allocator = Objects.requireNonNull(allocator);
51+
this.blockMetaData = Objects.requireNonNull(blockMetaData);
52+
this.compressorAdapter = Objects.requireNonNull(compressorAdapter);
5153
}
5254

5355
String[] getPrimitivePath(String columnName) {
5456
String[] result = {columnName};
5557

5658
Type rollingType;
57-
while (!(rollingType = type.getType(result)).isPrimitive()) {
59+
while (!(rollingType = schema.getType(result)).isPrimitive()) {
5860
GroupType groupType = rollingType.asGroupType();
5961
if (groupType.getFieldCount() != 1) {
6062
throw new UnsupportedOperationException("Encountered struct at:" + Arrays.toString(result));
@@ -74,7 +76,7 @@ public ColumnWriter addColumn(String columnName) {
7476
}
7577
activeWriter = new ColumnWriterImpl(this,
7678
countingOutput,
77-
type.getColumnDescription(getPrimitivePath(columnName)),
79+
ParquetSchemaUtil.columnDescriptor(schema, getPrimitivePath(columnName)).orElseThrow(),
7880
compressorAdapter,
7981
targetPageSize,
8082
allocator);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
//
2+
// Copyright (c) 2016-2024 Deephaven Data Labs and Patent Pending
3+
//
4+
package io.deephaven.parquet.impl;
5+
6+
import org.apache.parquet.column.ColumnDescriptor;
7+
import org.jetbrains.annotations.NotNull;
8+
9+
import javax.annotation.Nullable;
10+
11+
public final class ColumnDescriptorUtil {
12+
/**
13+
* A more thorough check of {@link ColumnDescriptor} equality. In addition to
14+
* {@link ColumnDescriptor#equals(Object)} which only checks the {@link ColumnDescriptor#getPath()}, this also
15+
* checks for the equality of {@link ColumnDescriptor#getPrimitiveType()},
16+
* {@link ColumnDescriptor#getMaxRepetitionLevel()}, and {@link ColumnDescriptor#getMaxDefinitionLevel()}.
17+
*/
18+
public static boolean equals(@NotNull ColumnDescriptor x, @Nullable ColumnDescriptor y) {
19+
return x == y || (x.equals(y)
20+
&& x.getPrimitiveType().equals(y.getPrimitiveType())
21+
&& x.getMaxRepetitionLevel() == y.getMaxRepetitionLevel()
22+
&& x.getMaxDefinitionLevel() == y.getMaxDefinitionLevel());
23+
}
24+
}

0 commit comments

Comments
 (0)