Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.io;

import org.apache.parquet.ParquetRuntimeException;

public class InvalidFileOffsetException extends ParquetRuntimeException {
private static final long serialVersionUID = 1L;

public InvalidFileOffsetException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.CorruptStatistics;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.Preconditions;
import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.statistics.BinaryStatistics;
Expand Down Expand Up @@ -119,10 +120,10 @@
import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
import org.apache.parquet.internal.hadoop.metadata.IndexReference;
import org.apache.parquet.io.InvalidFileOffsetException;
import org.apache.parquet.io.ParquetDecodingException;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.ColumnOrder.ColumnOrderName;
import org.apache.parquet.schema.LogicalTypeAnnotation.LogicalTypeAnnotationVisitor;
import org.apache.parquet.schema.LogicalTypeAnnotation.UUIDLogicalTypeAnnotation;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
Expand Down Expand Up @@ -201,8 +202,22 @@ public FileMetaData toParquetMetadata(int currentVersion, ParquetMetadata parque
List<BlockMetaData> blocks = parquetMetadata.getBlocks();
List<RowGroup> rowGroups = new ArrayList<RowGroup>();
long numRows = 0;
long preBlockStartPos = 0;
long preBlockCompressedSize = 0;
for (BlockMetaData block : blocks) {
numRows += block.getRowCount();
long blockStartPos = block.getStartingPos();
// first block
if (blockStartPos == 4) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this check is necessary, doesn't the first block always start at 4? Or this addresses a file merging usecase?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To address _common_metadata file case, which will merge multiple file footers into a single meta file.

preBlockStartPos = 0;
preBlockCompressedSize = 0;
}
if (preBlockStartPos != 0) {
Preconditions.checkState(blockStartPos >= preBlockStartPos + preBlockCompressedSize,
"Invalid block starting position:" + blockStartPos);
}
preBlockStartPos = blockStartPos;
preBlockCompressedSize = block.getCompressedSize();
addRowGroup(parquetMetadata, rowGroups, block, fileEncryptor);
}
FileMetaData fileMetaData = new FileMetaData(
Expand Down Expand Up @@ -1226,14 +1241,36 @@ public ParquetMetadata readParquetMetadata(InputStream from) throws IOException
static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMetadataFilter filter) {
List<RowGroup> rowGroups = metaData.getRow_groups();
List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
long preStartIndex = 0;
long preCompressedSize = 0;
boolean firstColumnWithMetadata = true;
if (rowGroups != null && rowGroups.size() > 0) {
firstColumnWithMetadata = rowGroups.get(0).getColumns().get(0).isSetMeta_data();
}
for (RowGroup rowGroup : rowGroups) {
long totalSize = 0;
long startIndex;
ColumnChunk columnChunk = rowGroup.getColumns().get(0);
if (firstColumnWithMetadata) {
startIndex = getOffset(columnChunk);
} else {
assert rowGroup.isSetFile_offset();
assert rowGroup.isSetTotal_compressed_size();

if (rowGroup.isSetFile_offset()) {
//the file_offset of first block always holds the truth, while other blocks don't :
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is no longer true with the issue we found in parquet-cpp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao Thanks for your information.May be the first rowgroup check is enough for this situation(firstFileOffset==4)? I will submit a new commit.

//see PARQUET-2078 for details
startIndex = rowGroup.getFile_offset();
} else {
startIndex = getOffset(rowGroup.getColumns().get(0));
if (invalidFileOffset(startIndex, preStartIndex, preCompressedSize)) {
//first row group's offset is always 4
if (preStartIndex == 0) {
startIndex = 4;
} else {
// use minStartIndex(imprecise in case of padding, but good enough for filtering)
startIndex = preStartIndex + preCompressedSize;
}
}
preStartIndex = startIndex;
preCompressedSize = rowGroup.getTotal_compressed_size();
}

if (rowGroup.isSetTotal_compressed_size()) {
Expand All @@ -1254,16 +1291,59 @@ static FileMetaData filterFileMetaDataByMidpoint(FileMetaData metaData, RangeMet
return metaData;
}

private static boolean invalidFileOffset(long startIndex, long preStartIndex, long preCompressedSize) {
boolean invalid = false;
assert preStartIndex <= startIndex;
// checking the first rowGroup
if (preStartIndex == 0 && startIndex != 4) {
invalid = true;
return invalid;
}

//calculate start index for other blocks
long minStartIndex = preStartIndex + preCompressedSize;
if (startIndex < minStartIndex) {
// a bad offset detected, try first column's offset
// can not use minStartIndex in case of padding
invalid = true;
}

return invalid;
}

// Visible for testing
static FileMetaData filterFileMetaDataByStart(FileMetaData metaData, OffsetMetadataFilter filter) {
List<RowGroup> rowGroups = metaData.getRow_groups();
List<RowGroup> newRowGroups = new ArrayList<RowGroup>();
long preStartIndex = 0;
long preCompressedSize = 0;
boolean firstColumnWithMetadata = true;
if (rowGroups != null && rowGroups.size() > 0) {
firstColumnWithMetadata = rowGroups.get(0).getColumns().get(0).isSetMeta_data();
}
for (RowGroup rowGroup : rowGroups) {
long startIndex;
if (rowGroup.isSetFile_offset()) {
startIndex = rowGroup.getFile_offset();
ColumnChunk columnChunk = rowGroup.getColumns().get(0);
if (firstColumnWithMetadata) {
startIndex = getOffset(columnChunk);
} else {
startIndex = getOffset(rowGroup.getColumns().get(0));
assert rowGroup.isSetFile_offset();
assert rowGroup.isSetTotal_compressed_size();

//the file_offset of first block always holds the truth, while other blocks don't :
//see PARQUET-2078 for details
startIndex = rowGroup.getFile_offset();
if (invalidFileOffset(startIndex, preStartIndex, preCompressedSize)) {
//first row group's offset is always 4
if (preStartIndex == 0) {
startIndex = 4;
} else {
throw new InvalidFileOffsetException("corrupted RowGroup.file_offset found, " +
"please use file range instead of block offset for split.");
}
}
preStartIndex = startIndex;
preCompressedSize = rowGroup.getTotal_compressed_size();
}

if (filter.contains(startIndex)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,7 @@ public void endColumn() throws IOException {
this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength);
this.uncompressedLength = 0;
this.compressedLength = 0;
this.currentChunkDictionaryPageOffset = 0;
columnIndexBuilder = null;
offsetIndexBuilder = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,19 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.Version;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.page.DataPageV2;
import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
import org.apache.parquet.hadoop.example.GroupReadSupport;
import org.apache.parquet.hadoop.util.ContextUtil;
import org.apache.parquet.io.ParquetEncodingException;
import org.junit.Assume;
import org.junit.Rule;
Expand Down Expand Up @@ -67,6 +73,7 @@

import static org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics;
import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
import static org.apache.parquet.hadoop.ParquetInputFormat.READ_SUPPORT_CLASS;
import static org.junit.Assert.*;
import static org.apache.parquet.column.Encoding.BIT_PACKED;
import static org.apache.parquet.column.Encoding.PLAIN;
Expand All @@ -75,6 +82,7 @@
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.Type.Repetition.*;
import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir;
import static org.junit.Assert.assertEquals;

import org.apache.parquet.example.data.Group;
import org.apache.parquet.example.data.simple.SimpleGroup;
Expand Down Expand Up @@ -239,6 +247,81 @@ public void testWriteRead() throws Exception {
PrintFooter.main(new String[] {path.toString()});
}

@Test
public void testWriteReadWithRecordReader() throws Exception {
File testFile = temp.newFile();
testFile.delete();

Path path = new Path(testFile.toURI());
Configuration configuration = new Configuration();

ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path);
w.start();
w.startBlock(3);
w.startColumn(C1, 5, CODEC);
w.writeDataPage(2, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
w.writeDataPage(3, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
w.endColumn();
w.startColumn(C2, 6, CODEC);
long c2Starts = w.getPos();
w.writeDictionaryPage(new DictionaryPage(BytesInput.from(BYTES2), 4, RLE_DICTIONARY));
long c2p1Starts = w.getPos();
w.writeDataPage(2, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
w.writeDataPage(3, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
w.writeDataPage(1, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
w.endColumn();
long c2Ends = w.getPos();
w.endBlock();
w.startBlock(4);
w.startColumn(C1, 7, CODEC);
long c1Bock2Starts = w.getPos();
long c1p1Bock2Starts = w.getPos();
w.writeDataPage(7, 4, BytesInput.from(BYTES3), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
w.endColumn();
long c1Block2Ends = w.getPos();
w.startColumn(C2, 8, CODEC);
w.writeDataPage(8, 4, BytesInput.from(BYTES4), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
w.endColumn();
w.endBlock();
w.end(new HashMap<String, String>());

ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path);
assertEquals("footer: "+ readFooter, 2, readFooter.getBlocks().size());
BlockMetaData rowGroup = readFooter.getBlocks().get(0);
assertEquals(c2Ends - c2Starts, rowGroup.getColumns().get(1).getTotalSize());

assertEquals(0, rowGroup.getColumns().get(0).getDictionaryPageOffset());
assertEquals(c2Starts, rowGroup.getColumns().get(1).getStartingPos());
assertEquals(c2Starts, rowGroup.getColumns().get(1).getDictionaryPageOffset());
assertEquals(c2p1Starts, rowGroup.getColumns().get(1).getFirstDataPageOffset());

BlockMetaData rowGroup2 = readFooter.getBlocks().get(1);
assertEquals(0, rowGroup2.getColumns().get(0).getDictionaryPageOffset());
assertEquals(c1Bock2Starts, rowGroup2.getColumns().get(0).getStartingPos());
assertEquals(c1p1Bock2Starts, rowGroup2.getColumns().get(0).getFirstDataPageOffset());
assertEquals(c1Block2Ends - c1Bock2Starts, rowGroup2.getColumns().get(0).getTotalSize());

HashSet<Encoding> expectedEncoding=new HashSet<Encoding>();
expectedEncoding.add(PLAIN);
expectedEncoding.add(BIT_PACKED);
assertEquals(expectedEncoding,rowGroup.getColumns().get(0).getEncodings());

ParquetInputSplit split = new ParquetInputSplit(path, 0, w.getPos(),null,
readFooter.getBlocks(), SCHEMA.toString(),
readFooter.getFileMetaData().getSchema().toString(),
readFooter.getFileMetaData().getKeyValueMetaData(),
null);
ParquetInputFormat input = new ParquetInputFormat();
configuration.set(READ_SUPPORT_CLASS, GroupReadSupport.class.getName());
TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt_0_1_m_1_1");
TaskAttemptContext taskContext = ContextUtil.newTaskAttemptContext(configuration, taskAttemptID);
RecordReader<Void, ArrayWritable> reader = input.createRecordReader(split, taskContext);
assertTrue(reader instanceof ParquetRecordReader);
//RowGroup.file_offset is checked here
reader.initialize(split, taskContext);
reader.close();
}

@Test
public void testWriteEmptyBlock() throws Exception {
File testFile = temp.newFile();
Expand Down