From 08b793673c9df12c12ba583f0f39e2f012eb18c2 Mon Sep 17 00:00:00 2001 From: xianyangliu Date: Thu, 6 Feb 2025 22:24:52 +0800 Subject: [PATCH] fixes #3122 --- .../parquet/column/page/DataPageV2.java | 46 +++++++++++++++-- .../org/apache/parquet/column/page/Page.java | 3 ++ .../column/impl/TestColumnReaderImpl.java | 43 ++++++++++++++++ .../converter/ParquetMetadataConverter.java | 45 ++++++----------- .../hadoop/ColumnChunkPageWriteStore.java | 19 ++++--- .../parquet/hadoop/ParquetFileReader.java | 1 + .../parquet/hadoop/ParquetFileWriter.java | 7 ++- .../parquet/encodings/FileEncodingsIT.java | 1 + .../parquet/hadoop/TestParquetWriter.java | 49 +++++++++++++++++++ 9 files changed, 169 insertions(+), 45 deletions(-) diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java index ebb678d911..01b5d891b5 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/page/DataPageV2.java @@ -53,6 +53,7 @@ public static DataPageV2 uncompressed( definitionLevels, dataEncoding, data, + 0, Math.toIntExact(repetitionLevels.size() + definitionLevels.size() + data.size()), statistics, false); @@ -89,6 +90,7 @@ public static DataPageV2 uncompressed( definitionLevels, dataEncoding, data, + 0, Math.toIntExact(repetitionLevels.size() + definitionLevels.size() + data.size()), statistics, false); @@ -124,6 +126,7 @@ public static DataPageV2 compressed( definitionLevels, dataEncoding, data, + Math.toIntExact(repetitionLevels.size() + definitionLevels.size() + data.size()), uncompressedSize, statistics, true); @@ -138,6 +141,10 @@ public static DataPageV2 compressed( private final Statistics statistics; private final boolean isCompressed; + /** + * @deprecated will be removed in 2.0.0. Use {@link DataPageV2#DataPageV2(int, int, int, long, BytesInput, BytesInput, Encoding, BytesInput, int, int, Statistics, boolean)} instead + */ + @Deprecated public DataPageV2( int rowCount, int nullCount, @@ -163,6 +170,33 @@ public DataPageV2( this.isCompressed = isCompressed; } + public DataPageV2( + int rowCount, + int nullCount, + int valueCount, + BytesInput repetitionLevels, + BytesInput definitionLevels, + Encoding dataEncoding, + BytesInput data, + int compressedSize, + int uncompressedSize, + Statistics statistics, + boolean isCompressed) { + super(compressedSize, uncompressedSize, valueCount); + if (!isCompressed && compressedSize != 0) { + throw new IllegalArgumentException("compressedSize must be 0 if page is not compressed"); + } + + this.rowCount = rowCount; + this.nullCount = nullCount; + this.repetitionLevels = repetitionLevels; + this.definitionLevels = definitionLevels; + this.dataEncoding = dataEncoding; + this.data = data; + this.statistics = statistics; + this.isCompressed = isCompressed; + } + private DataPageV2( int rowCount, int nullCount, @@ -172,14 +206,11 @@ private DataPageV2( BytesInput definitionLevels, Encoding dataEncoding, BytesInput data, + int compressedSize, int uncompressedSize, Statistics statistics, boolean isCompressed) { - super( - Math.toIntExact(repetitionLevels.size() + definitionLevels.size() + data.size()), - uncompressedSize, - valueCount, - firstRowIndex); + super(compressedSize, uncompressedSize, valueCount, firstRowIndex); this.rowCount = rowCount; this.nullCount = nullCount; this.repetitionLevels = repetitionLevels; @@ -190,6 +221,11 @@ private DataPageV2( this.isCompressed = isCompressed; } + @Override + public int getCompressedSize() { + return isCompressed ? super.getCompressedSize() : 0; + } + public int getRowCount() { return rowCount; } diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java b/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java index 1d92b45ce7..bd91fbc88b 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java @@ -34,6 +34,9 @@ public abstract class Page { this.uncompressedSize = uncompressedSize; } + /** + * @return the compressed size of the page when the bytes are compressed, otherwise return 0 + */ public int getCompressedSize() { return compressedSize; } diff --git a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java index ac6818952a..bf0cc08a1d 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java @@ -19,6 +19,7 @@ package org.apache.parquet.column.impl; import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertFalse; import static org.apache.parquet.column.ParquetProperties.WriterVersion.PARQUET_2_0; import java.util.List; @@ -135,4 +136,46 @@ public void testOptional() throws Exception { } assertEquals(0, converter.count); } + + @Test + public void testV2AllNullValues() throws Exception { + MessageType schema = MessageTypeParser.parseMessageType("message test { optional binary foo; }"); + ColumnDescriptor col = schema.getColumns().get(0); + MemPageWriter pageWriter = new MemPageWriter(); + ColumnWriterV2 columnWriterV2 = new ColumnWriterV2( + col, + pageWriter, + ParquetProperties.builder() + .withDictionaryPageSize(1024) + .withWriterVersion(PARQUET_2_0) + .withPageSize(2048) + .build()); + for (int i = 0; i < rows; i++) { + columnWriterV2.writeNull(0, 0); + } + columnWriterV2.writePage(); + columnWriterV2.finalizeColumnChunk(); + List pages = pageWriter.getPages(); + int valueCount = 0; + int rowCount = 0; + for (DataPage dataPage : pages) { + DataPageV2 page = (DataPageV2) dataPage; + valueCount += page.getValueCount(); + rowCount += page.getRowCount(); + assertFalse(page.isCompressed()); + assertEquals(0, page.getCompressedSize()); + } + assertEquals(rows, rowCount); + assertEquals(rows, valueCount); + MemPageReader pageReader = new MemPageReader(rows, pages.iterator(), pageWriter.getDictionaryPage()); + ValidatingConverter converter = new ValidatingConverter(); + ColumnReader columnReader = + new ColumnReaderImpl(col, pageReader, converter, VersionParser.parse(Version.FULL_VERSION)); + for (int i = 0; i < rows; i++) { + assertEquals(0, columnReader.getCurrentRepetitionLevel()); + assertEquals(0, columnReader.getCurrentDefinitionLevel()); + columnReader.consume(); + } + assertEquals(0, converter.count); + } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index e72f2c33a2..35111a6611 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -2123,6 +2123,9 @@ private PageHeader newDataPageV2Header( int dlByteLength) { DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2( valueCount, nullCount, rowCount, getEncoding(dataEncoding), dlByteLength, rlByteLength); + if (compressedSize == 0) { + dataPageHeaderV2.setIs_compressed(false); + } PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, uncompressedSize, compressedSize); pageHeader.setData_page_header_v2(dataPageHeaderV2); return pageHeader; @@ -2142,38 +2145,18 @@ public void writeDataPageV2Header( BlockCipher.Encryptor blockEncryptor, byte[] pageHeaderAAD) throws IOException { - writePageHeader( - newDataPageV2Header( - uncompressedSize, - compressedSize, - valueCount, - nullCount, - rowCount, - dataEncoding, - rlByteLength, - dlByteLength, - crc), - to, - blockEncryptor, - pageHeaderAAD); - } - - private PageHeader newDataPageV2Header( - int uncompressedSize, - int compressedSize, - int valueCount, - int nullCount, - int rowCount, - org.apache.parquet.column.Encoding dataEncoding, - int rlByteLength, - int dlByteLength, - int crc) { - DataPageHeaderV2 dataPageHeaderV2 = new DataPageHeaderV2( - valueCount, nullCount, rowCount, getEncoding(dataEncoding), dlByteLength, rlByteLength); - PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE_V2, uncompressedSize, compressedSize); - pageHeader.setData_page_header_v2(dataPageHeaderV2); + PageHeader pageHeader = newDataPageV2Header( + uncompressedSize, + compressedSize, + valueCount, + nullCount, + rowCount, + dataEncoding, + rlByteLength, + dlByteLength); pageHeader.setCrc(crc); - return pageHeader; + + writePageHeader(pageHeader, to, blockEncryptor, pageHeaderAAD); } public void writeDictionaryPageHeader( diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java index 795063e5c8..b7732712e9 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java @@ -295,14 +295,19 @@ public void writePageV2( int rlByteLength = toIntWithCheck(repetitionLevels.size()); int dlByteLength = toIntWithCheck(definitionLevels.size()); int uncompressedSize = toIntWithCheck(data.size() + repetitionLevels.size() + definitionLevels.size()); - // TODO: decide if we compress - BytesInput compressedData = compressor.compress(data); - if (null != pageBlockEncryptor) { - AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal); - compressedData = BytesInput.from(pageBlockEncryptor.encrypt(compressedData.toByteArray(), dataPageAAD)); + BytesInput compressedData = BytesInput.empty(); + int compressedSize = 0; + if (data.size() > 0) { + // TODO: decide if we compress + compressedData = compressor.compress(data); + if (null != pageBlockEncryptor) { + AesCipher.quickUpdatePageAAD(dataPageAAD, pageOrdinal); + compressedData = + BytesInput.from(pageBlockEncryptor.encrypt(compressedData.toByteArray(), dataPageAAD)); + } + compressedSize = + toIntWithCheck(compressedData.size() + repetitionLevels.size() + definitionLevels.size()); } - int compressedSize = - toIntWithCheck(compressedData.size() + repetitionLevels.size() + definitionLevels.size()); tempOutputStream.reset(); if (null != headerBlockEncryptor) { AesCipher.quickUpdatePageAAD(dataPageHeaderAAD, pageOrdinal); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 785f145b20..68cf2ed6a9 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -1969,6 +1969,7 @@ public ColumnChunkPageReader readAllPages( definitionLevels, converter.getEncoding(dataHeaderV2.getEncoding()), values, + dataHeaderV2.isIs_compressed() ? compressedPageSize : 0, uncompressedPageSize, converter.fromParquetStatistics( getFileMetaData().getCreatedBy(), dataHeaderV2.getStatistics(), type), diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index f0a912f599..80794a864c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -1150,8 +1150,11 @@ public void writeDataPageV2( int rlByteLength = toIntWithCheck(repetitionLevels.size(), "page repetition levels"); int dlByteLength = toIntWithCheck(definitionLevels.size(), "page definition levels"); - int compressedSize = - toIntWithCheck(compressedData.size() + repetitionLevels.size() + definitionLevels.size(), "page"); + int compressedSize = 0; + if (compressedData.size() > 0) { + compressedSize = + toIntWithCheck(compressedData.size() + repetitionLevels.size() + definitionLevels.size(), "page"); + } int uncompressedSize = toIntWithCheck(uncompressedDataSize + repetitionLevels.size() + definitionLevels.size(), "page"); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java b/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java index f2e6e16fcc..5c74fb3028 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/encodings/FileEncodingsIT.java @@ -452,6 +452,7 @@ public DataPage visit(DataPageV2 data) { BytesInput.from(data.getDefinitionLevels().toByteArray()), data.getDataEncoding(), BytesInput.from(data.getData().toByteArray()), + data.isCompressed() ? data.getCompressedSize() : 0, data.getUncompressedSize(), data.getStatistics(), data.isCompressed()); diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java index 64001bcaf2..9d22cc0bbe 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetWriter.java @@ -33,9 +33,11 @@ import static org.apache.parquet.schema.MessageTypeParser.parseMessageType; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.apache.parquet.schema.Type.Repetition.REQUIRED; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -60,6 +62,8 @@ import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.GroupFactory; import org.apache.parquet.example.data.simple.SimpleGroupFactory; +import org.apache.parquet.format.PageHeader; +import org.apache.parquet.format.Util; import org.apache.parquet.hadoop.example.ExampleParquetWriter; import org.apache.parquet.hadoop.example.GroupReadSupport; import org.apache.parquet.hadoop.example.GroupWriteSupport; @@ -592,4 +596,49 @@ public void testSizeStatisticsControl() throws Exception { } } } + + @Test + public void testV2WriteAllNullValues() throws Exception { + MessageType schema = Types.buildMessage().optional(FLOAT).named("float").named("msg"); + + Configuration conf = new Configuration(); + GroupWriteSupport.setSchema(schema, conf); + + File file = temp.newFile(); + temp.delete(); + Path path = new Path(file.getAbsolutePath()); + + SimpleGroupFactory factory = new SimpleGroupFactory(schema); + Group nullValue = factory.newGroup(); + int recordCount = 10; + + try (ParquetWriter writer = ExampleParquetWriter.builder(path) + .withAllocator(allocator) + .withConf(conf) + .withWriterVersion(WriterVersion.PARQUET_2_0) + .withDictionaryEncoding(false) + .build()) { + for (int i = 0; i < recordCount; i++) { + writer.write(nullValue); + } + } + + try (ParquetReader reader = + ParquetReader.builder(new GroupReadSupport(), path).build()) { + int readRecordCount = 0; + for (Group group = reader.read(); group != null; group = reader.read()) { + assertEquals(nullValue.toString(), group.toString()); + ++readRecordCount; + } + assertEquals("Number of written records should be equal to the read one", recordCount, readRecordCount); + } + + try (ParquetFileReader reader = ParquetFileReader.open(HadoopInputFile.fromPath(path, conf))) { + BlockMetaData blockMetaData = reader.getFooter().getBlocks().get(0); + reader.f.seek(blockMetaData.getStartingPos()); + PageHeader pageHeader = Util.readPageHeader(reader.f); + assertFalse(pageHeader.getData_page_header_v2().isIs_compressed()); + assertEquals(0, pageHeader.getCompressed_page_size()); + } + } }