From 0a0a00c8b1e8ec54e16eb20cc3f84904a4e6d5bd Mon Sep 17 00:00:00 2001 From: Nemon Lou Date: Sat, 28 Aug 2021 19:10:38 +0800 Subject: [PATCH 01/10] PARQUET-2078 Failed to read parquet file after writing with the same parquet version --- .../converter/ParquetMetadataConverter.java | 18 +--- .../parquet/hadoop/ParquetFileWriter.java | 1 + .../parquet/hadoop/TestParquetFileWriter.java | 85 +++++++++++++++++++ 3 files changed, 88 insertions(+), 16 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 3a10b1c895..918263ccfe 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -1228,13 +1228,7 @@ static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMet List newRowGroups = new ArrayList(); for (RowGroup rowGroup : rowGroups) { long totalSize = 0; - long startIndex; - - if (rowGroup.isSetFile_offset()) { - startIndex = rowGroup.getFile_offset(); - } else { - startIndex = getOffset(rowGroup.getColumns().get(0)); - } + long startIndex = getOffset(rowGroup); if (rowGroup.isSetTotal_compressed_size()) { totalSize = rowGroup.getTotal_compressed_size(); @@ -1259,12 +1253,7 @@ static FileMetaData filterFileMetaDataByStart(FileMetaData metaData, OffsetMetad List rowGroups = metaData.getRow_groups(); List newRowGroups = new ArrayList(); for (RowGroup rowGroup : rowGroups) { - long startIndex; - if (rowGroup.isSetFile_offset()) { - startIndex = rowGroup.getFile_offset(); - } else { - startIndex = getOffset(rowGroup.getColumns().get(0)); - } + long startIndex = getOffset(rowGroup); if (filter.contains(startIndex)) { newRowGroups.add(rowGroup); @@ -1275,9 +1264,6 @@ static FileMetaData filterFileMetaDataByStart(FileMetaData metaData, OffsetMetad } static long getOffset(RowGroup rowGroup) { - if (rowGroup.isSetFile_offset()) { - return rowGroup.getFile_offset(); - } return getOffset(rowGroup.getColumns().get(0)); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index afbdf7637e..2e5d55c28c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -852,6 +852,7 @@ public void endColumn() throws IOException { this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength); this.uncompressedLength = 0; this.compressedLength = 0; + this.currentChunkDictionaryPageOffset = 0; columnIndexBuilder = null; offsetIndexBuilder = null; } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index 5b8c5ed1b4..49e071e525 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -23,6 +23,10 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.Version; import org.apache.parquet.bytes.BytesUtils; @@ -30,6 +34,8 @@ import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter; import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.util.ContextUtil; import org.apache.parquet.io.ParquetEncodingException; import org.junit.Assume; import org.junit.Rule; @@ -67,6 +73,7 @@ import static org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics; import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; +import static org.apache.parquet.hadoop.ParquetInputFormat.READ_SUPPORT_CLASS; import static org.junit.Assert.*; import static org.apache.parquet.column.Encoding.BIT_PACKED; import static org.apache.parquet.column.Encoding.PLAIN; @@ -75,6 +82,7 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.Type.Repetition.*; import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir; +import static org.junit.Assert.assertEquals; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroup; @@ -191,6 +199,7 @@ public void testWriteRead() throws Exception { assertEquals(c2Ends - c2Starts, rowGroup.getColumns().get(1).getTotalSize()); assertEquals(c2Ends - c1Starts, rowGroup.getTotalByteSize()); + assertEquals(c1Starts, rowGroup.getColumns().get(0).getStartingPos()); assertEquals(0, rowGroup.getColumns().get(0).getDictionaryPageOffset()); assertEquals(c1p1Starts, rowGroup.getColumns().get(0).getFirstDataPageOffset()); @@ -239,6 +248,82 @@ public void testWriteRead() throws Exception { PrintFooter.main(new String[] {path.toString()}); } + @Test + public void testWriteReadWithRecordReader() throws Exception { + File testFile = temp.newFile(); + testFile.delete(); + + Path path = new Path(testFile.toURI()); + Configuration configuration = new Configuration(); + + ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path); + w.start(); + w.startBlock(3); + w.startColumn(C1, 5, CODEC); + w.writeDataPage(2, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); + w.writeDataPage(3, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + w.startColumn(C2, 6, CODEC); + long c2Starts = w.getPos(); + w.writeDictionaryPage(new DictionaryPage(BytesInput.from(BYTES2), 4, RLE_DICTIONARY)); + long c2p1Starts = w.getPos(); + w.writeDataPage(2, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); + w.writeDataPage(3, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); + w.writeDataPage(1, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + long c2Ends = w.getPos(); + w.endBlock(); + w.startBlock(4); + w.startColumn(C1, 7, CODEC); + long c1Bock2Starts = w.getPos(); + long c1p1Bock2Starts = w.getPos(); + w.writeDataPage(7, 4, BytesInput.from(BYTES3), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + long c1Block2Ends = w.getPos(); + w.startColumn(C2, 8, CODEC); + w.writeDataPage(8, 4, BytesInput.from(BYTES4), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + w.endBlock(); + w.end(new HashMap()); + + ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path); + assertEquals("footer: "+ readFooter, 2, readFooter.getBlocks().size()); + BlockMetaData rowGroup = readFooter.getBlocks().get(0); + assertEquals(c2Ends - c2Starts, rowGroup.getColumns().get(1).getTotalSize()); + + + assertEquals(0, rowGroup.getColumns().get(0).getDictionaryPageOffset()); + assertEquals(c2Starts, rowGroup.getColumns().get(1).getStartingPos()); + assertEquals(c2Starts, rowGroup.getColumns().get(1).getDictionaryPageOffset()); + assertEquals(c2p1Starts, rowGroup.getColumns().get(1).getFirstDataPageOffset()); + + BlockMetaData rowGroup2 = readFooter.getBlocks().get(1); + assertEquals(0, rowGroup2.getColumns().get(0).getDictionaryPageOffset()); + assertEquals(c1Bock2Starts, rowGroup2.getColumns().get(0).getStartingPos()); + assertEquals(c1p1Bock2Starts, rowGroup2.getColumns().get(0).getFirstDataPageOffset()); + assertEquals(c1Block2Ends - c1Bock2Starts, rowGroup2.getColumns().get(0).getTotalSize()); + + HashSet expectedEncoding=new HashSet(); + expectedEncoding.add(PLAIN); + expectedEncoding.add(BIT_PACKED); + assertEquals(expectedEncoding,rowGroup.getColumns().get(0).getEncodings()); + + ParquetInputSplit split = new ParquetInputSplit(path, 0, w.getPos(),null, + readFooter.getBlocks(), SCHEMA.toString(), + readFooter.getFileMetaData().getSchema().toString(), + readFooter.getFileMetaData().getKeyValueMetaData(), + null); + ParquetInputFormat input = new ParquetInputFormat(); + configuration.set(READ_SUPPORT_CLASS, GroupReadSupport.class.getName()); + TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt_0_1_m_1_1"); + TaskAttemptContext taskContext = ContextUtil.newTaskAttemptContext(configuration, taskAttemptID); + RecordReader reader = input.createRecordReader(split, taskContext); + assertTrue(reader instanceof ParquetRecordReader); + //RowGroup.file_offset is checked here + reader.initialize(split, taskContext); + reader.close(); + } + @Test public void testWriteEmptyBlock() throws Exception { File testFile = temp.newFile(); From 12f39c3b7d42abc127572a3bc06a8d3436058126 Mon Sep 17 00:00:00 2001 From: loudongfeng Date: Tue, 31 Aug 2021 16:56:16 +0800 Subject: [PATCH 02/10] PARQUET-2078 Failed to read parquet file after writing with the same parquet version Read path fix that make usage of this information: RowGroup[n].file_offset = RowGroup[n-1].file_offset + RowGroup[n-1].total_compressed_size --- .../converter/ParquetMetadataConverter.java | 45 ++++++++++++++++++- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 918263ccfe..6d87e9964a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -1226,9 +1226,28 @@ public ParquetMetadata readParquetMetadata(InputStream from) throws IOException static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMetadataFilter filter) { List rowGroups = metaData.getRow_groups(); List newRowGroups = new ArrayList(); + long preStartIndex = 0; + long preCompressedSize = 0; for (RowGroup rowGroup : rowGroups) { long totalSize = 0; - long startIndex = getOffset(rowGroup); + long startIndex; + if (rowGroup.isSetFile_offset()) { + assert rowGroup.isSetTotal_compressed_size(); + + if (preStartIndex == 0) { + //the first block always holds the truth + startIndex = rowGroup.getFile_offset(); + } else { + //calculate offset for other blocks + startIndex = preStartIndex + preCompressedSize; + } + + preStartIndex = startIndex; + preCompressedSize = rowGroup.getTotal_compressed_size(); + + } else { + startIndex = getOffset(rowGroup.getColumns().get(0)); + } if (rowGroup.isSetTotal_compressed_size()) { totalSize = rowGroup.getTotal_compressed_size(); @@ -1252,8 +1271,27 @@ static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMet static FileMetaData filterFileMetaDataByStart(FileMetaData metaData, OffsetMetadataFilter filter) { List rowGroups = metaData.getRow_groups(); List newRowGroups = new ArrayList(); + long preStartIndex = 0; + long preCompressedSize = 0; for (RowGroup rowGroup : rowGroups) { - long startIndex = getOffset(rowGroup); + long startIndex; + if (rowGroup.isSetFile_offset()) { + assert rowGroup.isSetTotal_compressed_size(); + + if (preStartIndex == 0) { + //the first block always holds the truth + startIndex = rowGroup.getFile_offset(); + } else { + //calculate offset for other blocks + startIndex = preStartIndex + preCompressedSize; + } + + preStartIndex = startIndex; + preCompressedSize = rowGroup.getTotal_compressed_size(); + + } else { + startIndex = getOffset(rowGroup.getColumns().get(0)); + } if (filter.contains(startIndex)) { newRowGroups.add(rowGroup); @@ -1264,6 +1302,9 @@ static FileMetaData filterFileMetaDataByStart(FileMetaData metaData, OffsetMetad } static long getOffset(RowGroup rowGroup) { + if (rowGroup.isSetFile_offset()) { + return rowGroup.getFile_offset(); + } return getOffset(rowGroup.getColumns().get(0)); } From 0328fb772fdb24eb058d93a1097e87367cd0321e Mon Sep 17 00:00:00 2001 From: loudongfeng Date: Tue, 31 Aug 2021 20:15:13 +0800 Subject: [PATCH 03/10] PARQUET-2078 Failed to read parquet file after writing with the same parquet version addressing review comments: more check on writer side. --- .../format/converter/ParquetMetadataConverter.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 6d87e9964a..6cf1ee2c76 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -201,9 +201,18 @@ public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parque List blocks = parquetMetadata.getBlocks(); List rowGroups = new ArrayList(); long numRows = 0; + long preBlockStartPos = 0; + long preBlockCompressedSize = 0; for (BlockMetaData block : blocks) { numRows += block.getRowCount(); + long blockStartPos = block.getStartingPos(); + if (preBlockStartPos != 0) { + assert blockStartPos == preBlockStartPos + preBlockCompressedSize; + } + preBlockStartPos = blockStartPos; + preBlockCompressedSize = block.getCompressedSize(); addRowGroup(parquetMetadata, rowGroups, block, fileEncryptor); + } FileMetaData fileMetaData = new FileMetaData( currentVersion, From 0e01c9038437f8aee99b197140453a47b399b9a3 Mon Sep 17 00:00:00 2001 From: loudongfeng Date: Wed, 1 Sep 2021 15:20:14 +0800 Subject: [PATCH 04/10] PARQUET-2078 Failed to read parquet file after writing with the same parquet version taking alignment padding and sumarry file into account --- .../converter/ParquetMetadataConverter.java | 47 ++++++++++++------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 6cf1ee2c76..f3dc9fe3c2 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -206,8 +206,13 @@ public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parque for (BlockMetaData block : blocks) { numRows += block.getRowCount(); long blockStartPos = block.getStartingPos(); + // first block + if (blockStartPos == 4) { + preBlockStartPos = 0; + preBlockCompressedSize = 0; + } if (preBlockStartPos != 0) { - assert blockStartPos == preBlockStartPos + preBlockCompressedSize; + assert blockStartPos >= preBlockStartPos + preBlockCompressedSize; } preBlockStartPos = blockStartPos; preBlockCompressedSize = block.getCompressedSize(); @@ -1243,14 +1248,9 @@ static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMet if (rowGroup.isSetFile_offset()) { assert rowGroup.isSetTotal_compressed_size(); - if (preStartIndex == 0) { - //the first block always holds the truth - startIndex = rowGroup.getFile_offset(); - } else { - //calculate offset for other blocks - startIndex = preStartIndex + preCompressedSize; - } - + //the file_offset of first block always holds the truth, while other blocks don't : + //see PARQUET-2078 for details + startIndex = tryUseFileOffset(rowGroup, preStartIndex, preCompressedSize); preStartIndex = startIndex; preCompressedSize = rowGroup.getTotal_compressed_size(); @@ -1276,6 +1276,24 @@ static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMet return metaData; } + private static long tryUseFileOffset(RowGroup rowGroup, long preStartIndex, long preCompressedSize) { + long startIndex = 0; + startIndex = rowGroup.getFile_offset(); + // skip checking the first rowGroup + // (in case of summary file, there are multiple first groups from different footers) + if (preStartIndex != 0 && preStartIndex <= startIndex) { + + //calculate start index for other blocks + long minStartIndex = preStartIndex + preCompressedSize; + if (startIndex < minStartIndex) { + // a bad offset detected, try first column's offset + // can not use minStartIndex in case of padding + startIndex = getOffset(rowGroup.getColumns().get(0)); + } + } + return startIndex; + } + // Visible for testing static FileMetaData filterFileMetaDataByStart(FileMetaData metaData, OffsetMetadataFilter filter) { List rowGroups = metaData.getRow_groups(); @@ -1287,14 +1305,9 @@ static FileMetaData filterFileMetaDataByStart(FileMetaData metaData, OffsetMetad if (rowGroup.isSetFile_offset()) { assert rowGroup.isSetTotal_compressed_size(); - if (preStartIndex == 0) { - //the first block always holds the truth - startIndex = rowGroup.getFile_offset(); - } else { - //calculate offset for other blocks - startIndex = preStartIndex + preCompressedSize; - } - + //the file_offset of first block always holds the truth, while other blocks don't : + //see PARQUET-2078 for details + startIndex = tryUseFileOffset(rowGroup, preStartIndex, preCompressedSize); preStartIndex = startIndex; preCompressedSize = rowGroup.getTotal_compressed_size(); From 3a2757d9fc0dd0cf1c8a4401787ba2b91e3bf8c9 Mon Sep 17 00:00:00 2001 From: loudongfeng Date: Thu, 2 Sep 2021 15:32:23 +0800 Subject: [PATCH 05/10] PARQUET-2078 Failed to read parquet file after writing with the same parquet version only throw exception when: 1.footer(first column of block meta) encrypted and 2.file_offset corrupted --- .../io/InvalidFileOffsetException.java | 29 ++++++++++++++ .../converter/ParquetMetadataConverter.java | 39 ++++++++++++------- 2 files changed, 54 insertions(+), 14 deletions(-) create mode 100644 parquet-column/src/main/java/org/apache/parquet/io/InvalidFileOffsetException.java diff --git a/parquet-column/src/main/java/org/apache/parquet/io/InvalidFileOffsetException.java b/parquet-column/src/main/java/org/apache/parquet/io/InvalidFileOffsetException.java new file mode 100644 index 0000000000..9c0cbe3999 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/io/InvalidFileOffsetException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.io; + +import org.apache.parquet.ParquetRuntimeException; + +public class InvalidFileOffsetException extends ParquetRuntimeException { + private static final long serialVersionUID = 1L; + + public InvalidFileOffsetException(String message) { + super(message); + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index f3dc9fe3c2..622e3d3cf6 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -119,6 +119,7 @@ import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder; import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder; import org.apache.parquet.internal.hadoop.metadata.IndexReference; +import org.apache.parquet.io.InvalidFileOffsetException; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.ColumnOrder.ColumnOrderName; @@ -1245,17 +1246,23 @@ static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMet for (RowGroup rowGroup : rowGroups) { long totalSize = 0; long startIndex; - if (rowGroup.isSetFile_offset()) { + ColumnChunk columnChunk = rowGroup.getColumns().get(0); + if (columnChunk.isSetMeta_data()) { + startIndex = getOffset(columnChunk); + } else { + assert rowGroup.isSetFile_offset(); assert rowGroup.isSetTotal_compressed_size(); //the file_offset of first block always holds the truth, while other blocks don't : //see PARQUET-2078 for details - startIndex = tryUseFileOffset(rowGroup, preStartIndex, preCompressedSize); + startIndex = rowGroup.getFile_offset(); + if (corruptedFileOffset(startIndex, preStartIndex, preCompressedSize)) { + // use minStartIndex(imprecise in case of padding, but good enough for filtering) + startIndex = preStartIndex + preCompressedSize; + } preStartIndex = startIndex; preCompressedSize = rowGroup.getTotal_compressed_size(); - } else { - startIndex = getOffset(rowGroup.getColumns().get(0)); } if (rowGroup.isSetTotal_compressed_size()) { @@ -1276,22 +1283,20 @@ static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMet return metaData; } - private static long tryUseFileOffset(RowGroup rowGroup, long preStartIndex, long preCompressedSize) { - long startIndex = 0; - startIndex = rowGroup.getFile_offset(); + private static boolean corruptedFileOffset(long startIndex, long preStartIndex, long preCompressedSize) { + boolean corrupted = false; // skip checking the first rowGroup // (in case of summary file, there are multiple first groups from different footers) if (preStartIndex != 0 && preStartIndex <= startIndex) { - //calculate start index for other blocks long minStartIndex = preStartIndex + preCompressedSize; if (startIndex < minStartIndex) { // a bad offset detected, try first column's offset // can not use minStartIndex in case of padding - startIndex = getOffset(rowGroup.getColumns().get(0)); + corrupted = true; } } - return startIndex; + return corrupted; } // Visible for testing @@ -1302,17 +1307,23 @@ static FileMetaData filterFileMetaDataByStart(FileMetaData metaData, OffsetMetad long preCompressedSize = 0; for (RowGroup rowGroup : rowGroups) { long startIndex; - if (rowGroup.isSetFile_offset()) { + ColumnChunk columnChunk = rowGroup.getColumns().get(0); + if (columnChunk.isSetMeta_data()) { + startIndex = getOffset(columnChunk); + } else { + assert rowGroup.isSetFile_offset(); assert rowGroup.isSetTotal_compressed_size(); //the file_offset of first block always holds the truth, while other blocks don't : //see PARQUET-2078 for details - startIndex = tryUseFileOffset(rowGroup, preStartIndex, preCompressedSize); + startIndex = rowGroup.getFile_offset(); + if (corruptedFileOffset(startIndex, preStartIndex, preCompressedSize)) { + throw new InvalidFileOffsetException("corrupted RowGroup.file_offset found, " + + "please use file range instead of block offset for split."); + } preStartIndex = startIndex; preCompressedSize = rowGroup.getTotal_compressed_size(); - } else { - startIndex = getOffset(rowGroup.getColumns().get(0)); } if (filter.contains(startIndex)) { From cd13468687aa2f3c6390800f6247f4c5ecd1ee1d Mon Sep 17 00:00:00 2001 From: loudongfeng Date: Thu, 2 Sep 2021 17:51:15 +0800 Subject: [PATCH 06/10] PARQUET-2078 Failed to read parquet file after writing with the same parquet version only check firstColumnChunk.isSetMeta_data() for the first block --- .../converter/ParquetMetadataConverter.java | 25 ++++++++++++------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 622e3d3cf6..94ad7cc861 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -123,7 +123,6 @@ import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.ColumnOrder.ColumnOrderName; -import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor; import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; @@ -1243,11 +1242,15 @@ static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMet List newRowGroups = new ArrayList(); long preStartIndex = 0; long preCompressedSize = 0; + boolean firstColumnWithMetadata = true; + if (rowGroups != null && rowGroups.size() > 0) { + firstColumnWithMetadata = rowGroups.get(0).getColumns().get(0).isSetMeta_data(); + } for (RowGroup rowGroup : rowGroups) { long totalSize = 0; long startIndex; ColumnChunk columnChunk = rowGroup.getColumns().get(0); - if (columnChunk.isSetMeta_data()) { + if (firstColumnWithMetadata) { startIndex = getOffset(columnChunk); } else { assert rowGroup.isSetFile_offset(); @@ -1256,7 +1259,7 @@ static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMet //the file_offset of first block always holds the truth, while other blocks don't : //see PARQUET-2078 for details startIndex = rowGroup.getFile_offset(); - if (corruptedFileOffset(startIndex, preStartIndex, preCompressedSize)) { + if (invalidFileOffset(startIndex, preStartIndex, preCompressedSize)) { // use minStartIndex(imprecise in case of padding, but good enough for filtering) startIndex = preStartIndex + preCompressedSize; } @@ -1283,8 +1286,8 @@ static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMet return metaData; } - private static boolean corruptedFileOffset(long startIndex, long preStartIndex, long preCompressedSize) { - boolean corrupted = false; + private static boolean invalidFileOffset(long startIndex, long preStartIndex, long preCompressedSize) { + boolean invalid = false; // skip checking the first rowGroup // (in case of summary file, there are multiple first groups from different footers) if (preStartIndex != 0 && preStartIndex <= startIndex) { @@ -1293,10 +1296,10 @@ private static boolean corruptedFileOffset(long startIndex, long preStartIndex, if (startIndex < minStartIndex) { // a bad offset detected, try first column's offset // can not use minStartIndex in case of padding - corrupted = true; + invalid = true; } } - return corrupted; + return invalid; } // Visible for testing @@ -1305,10 +1308,14 @@ static FileMetaData filterFileMetaDataByStart(FileMetaData metaData, OffsetMetad List newRowGroups = new ArrayList(); long preStartIndex = 0; long preCompressedSize = 0; + boolean firstColumnWithMetadata = true; + if (rowGroups != null && rowGroups.size() > 0) { + firstColumnWithMetadata = rowGroups.get(0).getColumns().get(0).isSetMeta_data(); + } for (RowGroup rowGroup : rowGroups) { long startIndex; ColumnChunk columnChunk = rowGroup.getColumns().get(0); - if (columnChunk.isSetMeta_data()) { + if (firstColumnWithMetadata) { startIndex = getOffset(columnChunk); } else { assert rowGroup.isSetFile_offset(); @@ -1317,7 +1324,7 @@ static FileMetaData filterFileMetaDataByStart(FileMetaData metaData, OffsetMetad //the file_offset of first block always holds the truth, while other blocks don't : //see PARQUET-2078 for details startIndex = rowGroup.getFile_offset(); - if (corruptedFileOffset(startIndex, preStartIndex, preCompressedSize)) { + if (invalidFileOffset(startIndex, preStartIndex, preCompressedSize)) { throw new InvalidFileOffsetException("corrupted RowGroup.file_offset found, " + "please use file range instead of block offset for split."); } From 347858dbd1844978e91b0a6cffc30f2e6040594a Mon Sep 17 00:00:00 2001 From: loudongfeng Date: Wed, 8 Sep 2021 15:43:29 +0800 Subject: [PATCH 07/10] PARQUET-2078 Failed to read parquet file after writing with the same parquet version address review comments: empty lines --- .../parquet/format/converter/ParquetMetadataConverter.java | 3 --- .../java/org/apache/parquet/hadoop/TestParquetFileWriter.java | 2 -- 2 files changed, 5 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 94ad7cc861..3dfe97a144 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -217,7 +217,6 @@ public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parque preBlockStartPos = blockStartPos; preBlockCompressedSize = block.getCompressedSize(); addRowGroup(parquetMetadata, rowGroups, block, fileEncryptor); - } FileMetaData fileMetaData = new FileMetaData( currentVersion, @@ -1265,7 +1264,6 @@ static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMet } preStartIndex = startIndex; preCompressedSize = rowGroup.getTotal_compressed_size(); - } if (rowGroup.isSetTotal_compressed_size()) { @@ -1330,7 +1328,6 @@ static FileMetaData filterFileMetaDataByStart(FileMetaData metaData, OffsetMetad } preStartIndex = startIndex; preCompressedSize = rowGroup.getTotal_compressed_size(); - } if (filter.contains(startIndex)) { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index 49e071e525..8dcbf4acf4 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -199,7 +199,6 @@ public void testWriteRead() throws Exception { assertEquals(c2Ends - c2Starts, rowGroup.getColumns().get(1).getTotalSize()); assertEquals(c2Ends - c1Starts, rowGroup.getTotalByteSize()); - assertEquals(c1Starts, rowGroup.getColumns().get(0).getStartingPos()); assertEquals(0, rowGroup.getColumns().get(0).getDictionaryPageOffset()); assertEquals(c1p1Starts, rowGroup.getColumns().get(0).getFirstDataPageOffset()); @@ -291,7 +290,6 @@ public void testWriteReadWithRecordReader() throws Exception { BlockMetaData rowGroup = readFooter.getBlocks().get(0); assertEquals(c2Ends - c2Starts, rowGroup.getColumns().get(1).getTotalSize()); - assertEquals(0, rowGroup.getColumns().get(0).getDictionaryPageOffset()); assertEquals(c2Starts, rowGroup.getColumns().get(1).getStartingPos()); assertEquals(c2Starts, rowGroup.getColumns().get(1).getDictionaryPageOffset()); From aefe1369ce24aa6c3e84a015320637b836b93a67 Mon Sep 17 00:00:00 2001 From: loudongfeng Date: Thu, 9 Sep 2021 10:49:17 +0800 Subject: [PATCH 08/10] PARQUET-2078 Failed to read parquet file after writing with the same parquet version check first rowgroup's file_offset too(SPARK-36696) --- .../converter/ParquetMetadataConverter.java | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 3dfe97a144..ed980d56f3 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -1286,17 +1286,25 @@ static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMet private static boolean invalidFileOffset(long startIndex, long preStartIndex, long preCompressedSize) { boolean invalid = false; - // skip checking the first rowGroup // (in case of summary file, there are multiple first groups from different footers) - if (preStartIndex != 0 && preStartIndex <= startIndex) { - //calculate start index for other blocks - long minStartIndex = preStartIndex + preCompressedSize; - if (startIndex < minStartIndex) { - // a bad offset detected, try first column's offset - // can not use minStartIndex in case of padding - invalid = true; - } + if (preStartIndex > startIndex) { + preStartIndex = 0; + preCompressedSize = 0; + } + // checking the first rowGroup + if (preStartIndex == 0 && startIndex != 4) { + invalid = true; + return invalid; } + + //calculate start index for other blocks + long minStartIndex = preStartIndex + preCompressedSize; + if (startIndex < minStartIndex) { + // a bad offset detected, try first column's offset + // can not use minStartIndex in case of padding + invalid = true; + } + return invalid; } From 00e737e26b1a2462ab39660d60f0a7fc5c9cc4d2 Mon Sep 17 00:00:00 2001 From: loudongfeng Date: Thu, 9 Sep 2021 11:41:25 +0800 Subject: [PATCH 09/10] PARQUET-2078 Failed to read parquet file after writing with the same parquet version Using Preconditions.checkState instead of assert in write path remove summary file footers case check in read path(which will never happen) --- .../format/converter/ParquetMetadataConverter.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index ed980d56f3..ab7b57f0b7 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -45,6 +45,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.parquet.CorruptStatistics; import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.statistics.BinaryStatistics; @@ -212,7 +213,8 @@ public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parque preBlockCompressedSize = 0; } if (preBlockStartPos != 0) { - assert blockStartPos >= preBlockStartPos + preBlockCompressedSize; + Preconditions.checkState(blockStartPos >= preBlockStartPos + preBlockCompressedSize, + "Invalid block starting position:" + blockStartPos); } preBlockStartPos = blockStartPos; preBlockCompressedSize = block.getCompressedSize(); @@ -1286,11 +1288,7 @@ static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMet private static boolean invalidFileOffset(long startIndex, long preStartIndex, long preCompressedSize) { boolean invalid = false; - // (in case of summary file, there are multiple first groups from different footers) - if (preStartIndex > startIndex) { - preStartIndex = 0; - preCompressedSize = 0; - } + assert preStartIndex <= startIndex; // checking the first rowGroup if (preStartIndex == 0 && startIndex != 4) { invalid = true; From c1802ecb11eaa134065a98f46c44b7013b684cfa Mon Sep 17 00:00:00 2001 From: loudongfeng Date: Thu, 9 Sep 2021 17:46:02 +0800 Subject: [PATCH 10/10] PARQUET-2078 Failed to read parquet file after writing with the same parquet version more special case for first row group --- .../converter/ParquetMetadataConverter.java | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index ab7b57f0b7..3c6e32cfde 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -1261,8 +1261,13 @@ static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMet //see PARQUET-2078 for details startIndex = rowGroup.getFile_offset(); if (invalidFileOffset(startIndex, preStartIndex, preCompressedSize)) { - // use minStartIndex(imprecise in case of padding, but good enough for filtering) - startIndex = preStartIndex + preCompressedSize; + //first row group's offset is always 4 + if (preStartIndex == 0) { + startIndex = 4; + } else { + // use minStartIndex(imprecise in case of padding, but good enough for filtering) + startIndex = preStartIndex + preCompressedSize; + } } preStartIndex = startIndex; preCompressedSize = rowGroup.getTotal_compressed_size(); @@ -1329,8 +1334,13 @@ static FileMetaData filterFileMetaDataByStart(FileMetaData metaData, OffsetMetad //see PARQUET-2078 for details startIndex = rowGroup.getFile_offset(); if (invalidFileOffset(startIndex, preStartIndex, preCompressedSize)) { - throw new InvalidFileOffsetException("corrupted RowGroup.file_offset found, " + - "please use file range instead of block offset for split."); + //first row group's offset is always 4 + if (preStartIndex == 0) { + startIndex = 4; + } else { + throw new InvalidFileOffsetException("corrupted RowGroup.file_offset found, " + + "please use file range instead of block offset for split."); + } } preStartIndex = startIndex; preCompressedSize = rowGroup.getTotal_compressed_size();