diff --git a/parquet-column/src/main/java/org/apache/parquet/io/InvalidFileOffsetException.java b/parquet-column/src/main/java/org/apache/parquet/io/InvalidFileOffsetException.java new file mode 100644 index 0000000000..9c0cbe3999 --- /dev/null +++ b/parquet-column/src/main/java/org/apache/parquet/io/InvalidFileOffsetException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.io; + +import org.apache.parquet.ParquetRuntimeException; + +public class InvalidFileOffsetException extends ParquetRuntimeException { + private static final long serialVersionUID = 1L; + + public InvalidFileOffsetException(String message) { + super(message); + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 3a10b1c895..3c6e32cfde 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -45,6 +45,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.parquet.CorruptStatistics; import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.Preconditions; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.statistics.BinaryStatistics; @@ -119,10 +120,10 @@ import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder; import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder; import org.apache.parquet.internal.hadoop.metadata.IndexReference; +import org.apache.parquet.io.InvalidFileOffsetException; import org.apache.parquet.io.ParquetDecodingException; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.ColumnOrder.ColumnOrderName; -import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor; import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; @@ -201,8 +202,22 @@ public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parque List blocks = parquetMetadata.getBlocks(); List rowGroups = new ArrayList(); long numRows = 0; + long preBlockStartPos = 0; + long preBlockCompressedSize = 0; for (BlockMetaData block : blocks) { numRows += block.getRowCount(); + long blockStartPos = block.getStartingPos(); + // first block + if (blockStartPos == 4) { + preBlockStartPos = 0; + preBlockCompressedSize = 0; + } + if (preBlockStartPos != 0) { + Preconditions.checkState(blockStartPos >= preBlockStartPos + preBlockCompressedSize, + "Invalid block starting position:" + blockStartPos); + } + preBlockStartPos = blockStartPos; + preBlockCompressedSize = block.getCompressedSize(); addRowGroup(parquetMetadata, rowGroups, block, fileEncryptor); } FileMetaData fileMetaData = new FileMetaData( @@ -1226,14 +1241,36 @@ public ParquetMetadata readParquetMetadata(InputStream from) throws IOException static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMetadataFilter filter) { List rowGroups = metaData.getRow_groups(); List newRowGroups = new ArrayList(); + long preStartIndex = 0; + long preCompressedSize = 0; + boolean firstColumnWithMetadata = true; + if (rowGroups != null && rowGroups.size() > 0) { + firstColumnWithMetadata = rowGroups.get(0).getColumns().get(0).isSetMeta_data(); + } for (RowGroup rowGroup : rowGroups) { long totalSize = 0; long startIndex; + ColumnChunk columnChunk = rowGroup.getColumns().get(0); + if (firstColumnWithMetadata) { + startIndex = getOffset(columnChunk); + } else { + assert rowGroup.isSetFile_offset(); + assert rowGroup.isSetTotal_compressed_size(); - if (rowGroup.isSetFile_offset()) { + //the file_offset of first block always holds the truth, while other blocks don't : + //see PARQUET-2078 for details startIndex = rowGroup.getFile_offset(); - } else { - startIndex = getOffset(rowGroup.getColumns().get(0)); + if (invalidFileOffset(startIndex, preStartIndex, preCompressedSize)) { + //first row group's offset is always 4 + if (preStartIndex == 0) { + startIndex = 4; + } else { + // use minStartIndex(imprecise in case of padding, but good enough for filtering) + startIndex = preStartIndex + preCompressedSize; + } + } + preStartIndex = startIndex; + preCompressedSize = rowGroup.getTotal_compressed_size(); } if (rowGroup.isSetTotal_compressed_size()) { @@ -1254,16 +1291,59 @@ static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMet return metaData; } + private static boolean invalidFileOffset(long startIndex, long preStartIndex, long preCompressedSize) { + boolean invalid = false; + assert preStartIndex <= startIndex; + // checking the first rowGroup + if (preStartIndex == 0 && startIndex != 4) { + invalid = true; + return invalid; + } + + //calculate start index for other blocks + long minStartIndex = preStartIndex + preCompressedSize; + if (startIndex < minStartIndex) { + // a bad offset detected, try first column's offset + // can not use minStartIndex in case of padding + invalid = true; + } + + return invalid; + } + // Visible for testing static FileMetaData filterFileMetaDataByStart(FileMetaData metaData, OffsetMetadataFilter filter) { List rowGroups = metaData.getRow_groups(); List newRowGroups = new ArrayList(); + long preStartIndex = 0; + long preCompressedSize = 0; + boolean firstColumnWithMetadata = true; + if (rowGroups != null && rowGroups.size() > 0) { + firstColumnWithMetadata = rowGroups.get(0).getColumns().get(0).isSetMeta_data(); + } for (RowGroup rowGroup : rowGroups) { long startIndex; - if (rowGroup.isSetFile_offset()) { - startIndex = rowGroup.getFile_offset(); + ColumnChunk columnChunk = rowGroup.getColumns().get(0); + if (firstColumnWithMetadata) { + startIndex = getOffset(columnChunk); } else { - startIndex = getOffset(rowGroup.getColumns().get(0)); + assert rowGroup.isSetFile_offset(); + assert rowGroup.isSetTotal_compressed_size(); + + //the file_offset of first block always holds the truth, while other blocks don't : + //see PARQUET-2078 for details + startIndex = rowGroup.getFile_offset(); + if (invalidFileOffset(startIndex, preStartIndex, preCompressedSize)) { + //first row group's offset is always 4 + if (preStartIndex == 0) { + startIndex = 4; + } else { + throw new InvalidFileOffsetException("corrupted RowGroup.file_offset found, " + + "please use file range instead of block offset for split."); + } + } + preStartIndex = startIndex; + preCompressedSize = rowGroup.getTotal_compressed_size(); } if (filter.contains(startIndex)) { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java index afbdf7637e..2e5d55c28c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java @@ -852,6 +852,7 @@ public void endColumn() throws IOException { this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength); this.uncompressedLength = 0; this.compressedLength = 0; + this.currentChunkDictionaryPageOffset = 0; columnIndexBuilder = null; offsetIndexBuilder = null; } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java index 5b8c5ed1b4..8dcbf4acf4 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java @@ -23,6 +23,10 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.Version; import org.apache.parquet.bytes.BytesUtils; @@ -30,6 +34,8 @@ import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter; import org.apache.parquet.column.values.bloomfilter.BloomFilter; import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.apache.parquet.hadoop.util.ContextUtil; import org.apache.parquet.io.ParquetEncodingException; import org.junit.Assume; import org.junit.Rule; @@ -67,6 +73,7 @@ import static org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics; import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; +import static org.apache.parquet.hadoop.ParquetInputFormat.READ_SUPPORT_CLASS; import static org.junit.Assert.*; import static org.apache.parquet.column.Encoding.BIT_PACKED; import static org.apache.parquet.column.Encoding.PLAIN; @@ -75,6 +82,7 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; import static org.apache.parquet.schema.Type.Repetition.*; import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir; +import static org.junit.Assert.assertEquals; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroup; @@ -239,6 +247,81 @@ public void testWriteRead() throws Exception { PrintFooter.main(new String[] {path.toString()}); } + @Test + public void testWriteReadWithRecordReader() throws Exception { + File testFile = temp.newFile(); + testFile.delete(); + + Path path = new Path(testFile.toURI()); + Configuration configuration = new Configuration(); + + ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path); + w.start(); + w.startBlock(3); + w.startColumn(C1, 5, CODEC); + w.writeDataPage(2, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); + w.writeDataPage(3, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + w.startColumn(C2, 6, CODEC); + long c2Starts = w.getPos(); + w.writeDictionaryPage(new DictionaryPage(BytesInput.from(BYTES2), 4, RLE_DICTIONARY)); + long c2p1Starts = w.getPos(); + w.writeDataPage(2, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); + w.writeDataPage(3, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); + w.writeDataPage(1, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + long c2Ends = w.getPos(); + w.endBlock(); + w.startBlock(4); + w.startColumn(C1, 7, CODEC); + long c1Bock2Starts = w.getPos(); + long c1p1Bock2Starts = w.getPos(); + w.writeDataPage(7, 4, BytesInput.from(BYTES3), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + long c1Block2Ends = w.getPos(); + w.startColumn(C2, 8, CODEC); + w.writeDataPage(8, 4, BytesInput.from(BYTES4), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); + w.endColumn(); + w.endBlock(); + w.end(new HashMap()); + + ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path); + assertEquals("footer: "+ readFooter, 2, readFooter.getBlocks().size()); + BlockMetaData rowGroup = readFooter.getBlocks().get(0); + assertEquals(c2Ends - c2Starts, rowGroup.getColumns().get(1).getTotalSize()); + + assertEquals(0, rowGroup.getColumns().get(0).getDictionaryPageOffset()); + assertEquals(c2Starts, rowGroup.getColumns().get(1).getStartingPos()); + assertEquals(c2Starts, rowGroup.getColumns().get(1).getDictionaryPageOffset()); + assertEquals(c2p1Starts, rowGroup.getColumns().get(1).getFirstDataPageOffset()); + + BlockMetaData rowGroup2 = readFooter.getBlocks().get(1); + assertEquals(0, rowGroup2.getColumns().get(0).getDictionaryPageOffset()); + assertEquals(c1Bock2Starts, rowGroup2.getColumns().get(0).getStartingPos()); + assertEquals(c1p1Bock2Starts, rowGroup2.getColumns().get(0).getFirstDataPageOffset()); + assertEquals(c1Block2Ends - c1Bock2Starts, rowGroup2.getColumns().get(0).getTotalSize()); + + HashSet expectedEncoding=new HashSet(); + expectedEncoding.add(PLAIN); + expectedEncoding.add(BIT_PACKED); + assertEquals(expectedEncoding,rowGroup.getColumns().get(0).getEncodings()); + + ParquetInputSplit split = new ParquetInputSplit(path, 0, w.getPos(),null, + readFooter.getBlocks(), SCHEMA.toString(), + readFooter.getFileMetaData().getSchema().toString(), + readFooter.getFileMetaData().getKeyValueMetaData(), + null); + ParquetInputFormat input = new ParquetInputFormat(); + configuration.set(READ_SUPPORT_CLASS, GroupReadSupport.class.getName()); + TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt_0_1_m_1_1"); + TaskAttemptContext taskContext = ContextUtil.newTaskAttemptContext(configuration, taskAttemptID); + RecordReader reader = input.createRecordReader(split, taskContext); + assertTrue(reader instanceof ParquetRecordReader); + //RowGroup.file_offset is checked here + reader.initialize(split, taskContext); + reader.close(); + } + @Test public void testWriteEmptyBlock() throws Exception { File testFile = temp.newFile();