From 453a6cc9ef6bda9bd963fab05e59ab6f51389dfa Mon Sep 17 00:00:00 2001 From: Felix Schmalzel <7929762+fschmalzel@users.noreply.github.com> Date: Tue, 9 Feb 2021 10:28:09 +0100 Subject: [PATCH 1/4] PARQUET-1982: Random access to row groups in ParquetFileReader Adds a method readRowGroup(BlockMetaData) to allow random access to PageReadStores via BlockMetaData, which can be obtained using the getRowGroups() method. This is similar to the existing method getDictionaryReader(BlockMetaData) that already exists. With random access the reader can be reused if for example someone needs to go back a row group. This would improve performance because we don't need to open the file again and read the metadata. --- .../parquet/hadoop/ParquetFileReader.java | 136 ++++++-- .../hadoop/TestParquetReaderRandomAccess.java | 292 ++++++++++++++++++ .../statistics/DataGenerationContext.java | 85 +++++ .../parquet/statistics/RandomValues.java | 4 +- .../parquet/statistics/TestStatistics.java | 57 ---- 5 files changed, 481 insertions(+), 93 deletions(-) create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/statistics/DataGenerationContext.java diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 791f9ef188..f6ab8cbe55 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -888,20 +888,46 @@ public void appendTo(ParquetFileWriter writer) throws IOException { writer.appendRowGroups(f, blocks, true); } + /** + * Reads all the columns requested from the row group at the specified block. + * + * @param block the metadata for the requested RowGroup + * @throws IOException if an error occurs while reading + * @return the PageReadStore which can provide PageReaders for each column. + */ + public PageReadStore readRowGroup(BlockMetaData block) throws IOException { + return internalReadRowGroup(block); + } + /** * Reads all the columns requested from the row group at the current file position. * @throws IOException if an error occurs while reading * @return the PageReadStore which can provide PageReaders for each column. */ public PageReadStore readNextRowGroup() throws IOException { - if (currentBlock == blocks.size()) { + if (currentBlock >= blocks.size()) { + return null; + } + this.currentRowGroup = internalReadRowGroup(blocks.get(currentBlock)); + + // avoid re-reading bytes the dictionary reader is used after this call + if (nextDictionaryReader != null) { + nextDictionaryReader.setRowGroup(currentRowGroup); + } + + advanceToNextBlock(); + + return currentRowGroup; + } + + private ColumnChunkPageReadStore internalReadRowGroup(BlockMetaData block) throws IOException { + if (block == null) { return null; } - BlockMetaData block = blocks.get(currentBlock); if (block.getRowCount() == 0) { throw new RuntimeException("Illegal row group of 0 rows"); } - this.currentRowGroup = new ColumnChunkPageReadStore(block.getRowCount()); + ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(block.getRowCount()); // prepare the list of consecutive parts to read them in one scan List allParts = new ArrayList(); ConsecutivePartList currentParts = null; @@ -920,22 +946,52 @@ public PageReadStore readNextRowGroup() throws IOException { } } // actually read all the chunks - ChunkListBuilder builder = new ChunkListBuilder(); + ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount()); for (ConsecutivePartList consecutiveChunks : allParts) { consecutiveChunks.readAll(f, builder); } for (Chunk chunk : builder.build()) { - readChunkPages(chunk, block); + readChunkPages(chunk, block, rowGroup); } - // avoid re-reading bytes the dictionary reader is used after this call - if (nextDictionaryReader != null) { - nextDictionaryReader.setRowGroup(currentRowGroup); + return rowGroup; + } + + /** + * Reads all the columns requested from the specified row group. It may skip specific pages based on the column + * indexes according to the actual filter. As the rows are not aligned among the pages of the different columns row + * synchronization might be required. See the documentation of the class SynchronizingColumnReader for details. + * + * @param blockIndex the index of the requested block + * @return the PageReadStore which can provide PageReaders for each column or null if there are no rows in this block + * @throws IOException if an error occurs while reading + */ + public PageReadStore readFilteredRowGroup(int blockIndex) throws IOException { + if (blockIndex < 0 || blockIndex >= blocks.size()) { + return null; } + BlockMetaData block = blocks.get(blockIndex); - advanceToNextBlock(); + // Filtering not required -> fall back to the non-filtering path + if (!options.useColumnIndexFilter() || !FilterCompat.isFilteringRequired(options.getRecordFilter())) { + return internalReadRowGroup(block); + } - return currentRowGroup; + if (block.getRowCount() == 0) { + throw new RuntimeException("Illegal row group of 0 rows"); + } + RowRanges rowRanges = getRowRanges(blockIndex); + long rowCount = rowRanges.rowCount(); + if (rowCount == 0) { + // There are no matching rows -> returning null + return null; + } + if (rowCount == block.getRowCount()) { + // All rows are matching -> fall back to the non-filtering path + return internalReadRowGroup(block); + } + + return internalReadFilteredRowGroup(block, rowRanges, getColumnIndexStore(blockIndex)); } /** @@ -945,13 +1001,13 @@ public PageReadStore readNextRowGroup() throws IOException { * details. * * @return the PageReadStore which can provide PageReaders for each column - * @throws IOException - * if any I/O error occurs while reading + * @throws IOException if an error occurs while reading */ public PageReadStore readNextFilteredRowGroup() throws IOException { if (currentBlock == blocks.size()) { return null; } + // Filtering not required -> fall back to the non-filtering path if (!options.useColumnIndexFilter() || !FilterCompat.isFilteringRequired(options.getRecordFilter())) { return readNextRowGroup(); } @@ -959,7 +1015,6 @@ public PageReadStore readNextFilteredRowGroup() throws IOException { if (block.getRowCount() == 0) { throw new RuntimeException("Illegal row group of 0 rows"); } - ColumnIndexStore ciStore = getColumnIndexStore(currentBlock); RowRanges rowRanges = getRowRanges(currentBlock); long rowCount = rowRanges.rowCount(); if (rowCount == 0) { @@ -972,9 +1027,22 @@ public PageReadStore readNextFilteredRowGroup() throws IOException { return readNextRowGroup(); } - this.currentRowGroup = new ColumnChunkPageReadStore(rowRanges); + this.currentRowGroup = internalReadFilteredRowGroup(block, rowRanges, getColumnIndexStore(currentBlock)); + + // avoid re-reading bytes the dictionary reader is used after this call + if (nextDictionaryReader != null) { + nextDictionaryReader.setRowGroup(currentRowGroup); + } + + advanceToNextBlock(); + + return this.currentRowGroup; + } + + private ColumnChunkPageReadStore internalReadFilteredRowGroup(BlockMetaData block, RowRanges rowRanges, ColumnIndexStore ciStore) throws IOException { + ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(rowRanges); // prepare the list of consecutive parts to read them in one scan - ChunkListBuilder builder = new ChunkListBuilder(); + ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount()); List allParts = new ArrayList(); ConsecutivePartList currentParts = null; for (ColumnChunkMetaData mc : block.getColumns()) { @@ -1005,31 +1073,24 @@ public PageReadStore readNextFilteredRowGroup() throws IOException { consecutiveChunks.readAll(f, builder); } for (Chunk chunk : builder.build()) { - readChunkPages(chunk, block); + readChunkPages(chunk, block, rowGroup); } - // avoid re-reading bytes the dictionary reader is used after this call - if (nextDictionaryReader != null) { - nextDictionaryReader.setRowGroup(currentRowGroup); - } - - advanceToNextBlock(); - - return currentRowGroup; + return rowGroup; } - private void readChunkPages(Chunk chunk, BlockMetaData block) throws IOException { + private void readChunkPages(Chunk chunk, BlockMetaData block, ColumnChunkPageReadStore rowGroup) throws IOException { if (null == fileDecryptor || fileDecryptor.plaintextFile()) { - currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages()); + rowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages()); return; } // Encrypted file ColumnPath columnPath = ColumnPath.get(chunk.descriptor.col.getPath()); InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.getColumnSetup(columnPath); if (!columnDecryptionSetup.isEncrypted()) { // plaintext column - currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages()); + rowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages()); } else { // encrypted column - currentRowGroup.addColumn(chunk.descriptor.col, + rowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages(columnDecryptionSetup.getMetaDataDecryptor(), columnDecryptionSetup.getDataDecryptor(), fileDecryptor.getFileAAD(), block.getOrdinal(), columnDecryptionSetup.getOrdinal())); } @@ -1315,8 +1376,13 @@ private class ChunkData { private final Map map = new HashMap<>(); private ChunkDescriptor lastDescriptor; + private final long rowCount; private SeekableInputStream f; + public ChunkListBuilder(long rowCount) { + this.rowCount = rowCount; + } + void add(ChunkDescriptor descriptor, List buffers, SeekableInputStream f) { ChunkData data = map.get(descriptor); if (data == null) { @@ -1345,9 +1411,9 @@ List build() { ChunkData data = entry.getValue(); if (descriptor.equals(lastDescriptor)) { // because of a bug, the last chunk might be larger than descriptor.size - chunks.add(new WorkaroundChunk(lastDescriptor, data.buffers, f, data.offsetIndex)); + chunks.add(new WorkaroundChunk(lastDescriptor, data.buffers, f, data.offsetIndex, rowCount)); } else { - chunks.add(new Chunk(descriptor, data.buffers, data.offsetIndex)); + chunks.add(new Chunk(descriptor, data.buffers, data.offsetIndex, rowCount)); } } return chunks; @@ -1362,16 +1428,18 @@ private class Chunk { protected final ChunkDescriptor descriptor; protected final ByteBufferInputStream stream; final OffsetIndex offsetIndex; + final long rowCount; /** * @param descriptor descriptor for the chunk * @param buffers ByteBuffers that contain the chunk * @param offsetIndex the offset index for this column; might be null */ - public Chunk(ChunkDescriptor descriptor, List buffers, OffsetIndex offsetIndex) { + public Chunk(ChunkDescriptor descriptor, List buffers, OffsetIndex offsetIndex, long rowCount) { this.descriptor = descriptor; this.stream = ByteBufferInputStream.wrap(buffers); this.offsetIndex = offsetIndex; + this.rowCount = rowCount; } protected PageHeader readPageHeader() throws IOException { @@ -1518,7 +1586,7 @@ public ColumnChunkPageReader readAllPages(BlockCipher.Decryptor headerBlockDecry } BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec()); return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage, offsetIndex, - blocks.get(currentBlock).getRowCount(), pageBlockDecryptor, aadPrefix, rowGroupOrdinal, columnOrdinal); + rowCount, pageBlockDecryptor, aadPrefix, rowGroupOrdinal, columnOrdinal); } private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoFar) { @@ -1556,8 +1624,8 @@ private class WorkaroundChunk extends Chunk { * @param descriptor the descriptor of the chunk * @param f the file stream positioned at the end of this chunk */ - private WorkaroundChunk(ChunkDescriptor descriptor, List buffers, SeekableInputStream f, OffsetIndex offsetIndex) { - super(descriptor, buffers, offsetIndex); + private WorkaroundChunk(ChunkDescriptor descriptor, List buffers, SeekableInputStream f, OffsetIndex offsetIndex, long rowCount) { + super(descriptor, buffers, offsetIndex, rowCount); this.f = f; } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java new file mode 100644 index 0000000000..d8a3e3ba2a --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.column.page.DataPage; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroup; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.schema.*; +import org.apache.parquet.statistics.DataGenerationContext; +import org.apache.parquet.statistics.RandomValues; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; +import static org.junit.Assert.*; + +public class TestParquetReaderRandomAccess { + private static final int KILOBYTE = 1 << 10; + private static final long RANDOM_SEED = 7174252115631550700L; + + @Rule + public final TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void test() throws IOException { + Random random = new Random(RANDOM_SEED); + + File file = temp.newFile("test_file.parquet"); + file.delete(); + + int blockSize = 500 * KILOBYTE; + int pageSize = 20 * KILOBYTE; + + List contexts = new ArrayList<>(); + + for (boolean enableDictionary : new boolean[]{false, true}) { + for (WriterVersion writerVersion : new WriterVersion[]{WriterVersion.PARQUET_1_0, WriterVersion.PARQUET_2_0}) { + contexts.add( + new DataContextRandom(random.nextLong(), file, blockSize, + pageSize, enableDictionary, writerVersion)); + contexts.add( + new DataContextRandomAndSequential(random.nextLong(), file, blockSize, + pageSize, enableDictionary, writerVersion)); + } + } + + for (DataContext context : contexts) { + DataGenerationContext.writeAndTest(context); + } + } + + public static abstract class DataContext extends DataGenerationContext.WriteContext { + + private static final int recordCount = 100_000; + + private final Random random; + + private final List> randomGenerators; + + protected final ColumnDescriptor testColumnDescriptor; + + public DataContext(long seed, File path, int blockSize, int pageSize, boolean enableDictionary, ParquetProperties.WriterVersion version) throws IOException { + super(path, buildSchema(seed), blockSize, pageSize, enableDictionary, true, version); + + this.random = new Random(seed); + + int fixedLength = schema.getType("fixed-binary").asPrimitiveType().getTypeLength(); + + this.testColumnDescriptor = super.schema.getColumnDescription(new String[]{"unconstrained-i64"}); + + randomGenerators = Arrays.asList( + new RandomValues.IntGenerator(random.nextLong()), + new RandomValues.LongGenerator(random.nextLong()), + new RandomValues.FloatGenerator(random.nextLong()), + new RandomValues.DoubleGenerator(random.nextLong()), + new RandomValues.StringGenerator(random.nextLong()), + new RandomValues.FixedGenerator(random.nextLong(), fixedLength), + new RandomValues.UnconstrainedIntGenerator(random.nextLong()), + new RandomValues.UnconstrainedLongGenerator(random.nextLong()) + ); + } + + private static MessageType buildSchema(long seed) { + Random random = new Random(seed); + int fixedBinaryLength = random.nextInt(21) + 1; + + return new MessageType("schema", + new PrimitiveType(OPTIONAL, INT32, "i32"), + new PrimitiveType(OPTIONAL, INT64, "i64"), + new PrimitiveType(OPTIONAL, FLOAT, "sngl"), + new PrimitiveType(OPTIONAL, DOUBLE, "dbl"), + new PrimitiveType(OPTIONAL, BINARY, "strings"), + new PrimitiveType(OPTIONAL, FIXED_LEN_BYTE_ARRAY, fixedBinaryLength, "fixed-binary"), + new PrimitiveType(REQUIRED, INT32, "unconstrained-i32"), + new PrimitiveType(REQUIRED, INT64, "unconstrained-i64") + ); + } + + @Override + public void write(ParquetWriter writer) throws IOException { + for (int index = 0; index < recordCount; index++) { + Group group = new SimpleGroup(super.schema); + + for (int column = 0, columnCnt = schema.getFieldCount(); column < columnCnt; ++column) { + Type type = schema.getType(column); + RandomValues.RandomValueGenerator generator = randomGenerators.get(column); + if (type.isRepetition(OPTIONAL) && generator.shouldGenerateNull()) { + continue; + } + switch (type.asPrimitiveType().getPrimitiveTypeName()) { + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + case INT96: + group.append(type.getName(), ((RandomValues.RandomBinaryBase) generator).nextBinaryValue()); + break; + case INT32: + group.append(type.getName(), (Integer) generator.nextValue()); + break; + case INT64: + group.append(type.getName(), (Long) generator.nextValue()); + break; + case FLOAT: + group.append(type.getName(), (Float) generator.nextValue()); + break; + case DOUBLE: + group.append(type.getName(), (Double) generator.nextValue()); + break; + case BOOLEAN: + group.append(type.getName(), (Boolean) generator.nextValue()); + break; + } + } + writer.write(group); + } + } + + public static byte[] getBytesFromPage(DataPage page) throws IOException { + if (page instanceof DataPageV1) { + return ((DataPageV1) page).getBytes().toByteArray(); + } else if (page instanceof DataPageV2) { + return ((DataPageV2) page).getData().toByteArray(); + } else { + fail(); + return null; + } + } + + @Override + public void test() throws IOException { + Configuration configuration = new Configuration(); + ParquetReadOptions options = ParquetReadOptions.builder().build(); + + List testPageBytes = new ArrayList<>(); + List testPageValueCounts = new ArrayList<>(); + + try (ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(super.fsPath, configuration), options)) { + PageReadStore pages; + while ((pages = reader.readNextRowGroup()) != null) { + DataPage page = pages.getPageReader(testColumnDescriptor).readPage(); + testPageBytes.add(getBytesFromPage(page)); + testPageValueCounts.add(page.getValueCount()); + } + } + + try (ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(super.fsPath, configuration), options)) { + List blocks = reader.getRowGroups(); + + // Randomize indexes + List indexes = new ArrayList<>(); + for (int i = 0; i < blocks.size(); i++) { + for (int j = 0; j < 4; j++) { + indexes.add(i); + } + } + + Collections.shuffle(indexes, random); + + test(reader, indexes, testPageBytes, testPageValueCounts); + } + } + + protected abstract void test(ParquetFileReader reader, List indexes, List idPageBytes, List idPageValueCounts) throws IOException; + } + + public static class DataContextRandom extends DataContext { + + public DataContextRandom(long seed, File path, int blockSize, int pageSize, boolean enableDictionary, ParquetProperties.WriterVersion version) throws IOException { + super(seed, path, blockSize, pageSize, enableDictionary, version); + } + + @Override + protected void test(ParquetFileReader reader, List indexes, List testPageBytes, List testPageValueCounts) throws IOException { + List blocks = reader.getRowGroups(); + + for (int index: indexes) { + PageReadStore pages = reader.readRowGroup(blocks.get(index)); + DataPage page = pages.getPageReader(testColumnDescriptor).readPage(); + assertArrayEquals(testPageBytes.get(index), getBytesFromPage(page)); + assertEquals((int) testPageValueCounts.get(index), page.getValueCount()); + } + } + } + + public static class DataContextRandomAndSequential extends DataContext { + + public DataContextRandomAndSequential(long seed, File path, int blockSize, int pageSize, boolean enableDictionary, ParquetProperties.WriterVersion version) throws IOException { + super(seed, path, blockSize, pageSize, enableDictionary, version); + } + + @Override + protected void test(ParquetFileReader reader, List indexes, List testPageBytes, List testPageValueCounts) throws IOException { + List blocks = reader.getRowGroups(); + int splitPoint = indexes.size()/2; + + { + PageReadStore pages = reader.readNextRowGroup(); + DataPage page = pages.getPageReader(testColumnDescriptor).readPage(); + assertArrayEquals(testPageBytes.get(0), getBytesFromPage(page)); + assertEquals((int) testPageValueCounts.get(0), page.getValueCount()); + } + for (int i = 0; i < splitPoint; i++) { + int index = indexes.get(i); + PageReadStore pages = reader.readRowGroup(blocks.get(index)); + DataPage page = pages.getPageReader(testColumnDescriptor).readPage(); + assertArrayEquals(testPageBytes.get(index), getBytesFromPage(page)); + assertEquals((int) testPageValueCounts.get(index), page.getValueCount()); + } + { + PageReadStore pages = reader.readNextRowGroup(); + DataPage page = pages.getPageReader(testColumnDescriptor).readPage(); + assertArrayEquals(testPageBytes.get(1), getBytesFromPage(page)); + assertEquals((int) testPageValueCounts.get(1), page.getValueCount()); + } + for (int i = splitPoint; i < indexes.size(); i++) { + int index = indexes.get(i); + PageReadStore pages = reader.readRowGroup(blocks.get(index)); + DataPage page = pages.getPageReader(testColumnDescriptor).readPage(); + assertArrayEquals(testPageBytes.get(index), getBytesFromPage(page)); + assertEquals((int) testPageValueCounts.get(index), page.getValueCount()); + } + { + PageReadStore pages = reader.readNextRowGroup(); + DataPage page = pages.getPageReader(testColumnDescriptor).readPage(); + assertArrayEquals(testPageBytes.get(2), getBytesFromPage(page)); + assertEquals((int) testPageValueCounts.get(2), page.getValueCount()); + } + } + } + +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/DataGenerationContext.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/DataGenerationContext.java new file mode 100644 index 0000000000..c3c61c49b6 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/DataGenerationContext.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.statistics; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; + +import java.io.File; +import java.io.IOException; + +public class DataGenerationContext { + public static abstract class WriteContext { + protected final File path; + protected final Path fsPath; + protected final MessageType schema; + protected final int blockSize; + protected final int pageSize; + protected final boolean enableDictionary; + protected final boolean enableValidation; + protected final ParquetProperties.WriterVersion version; + + public WriteContext(File path, MessageType schema, int blockSize, int pageSize, boolean enableDictionary, boolean enableValidation, ParquetProperties.WriterVersion version) throws IOException { + this.path = path; + this.fsPath = new Path(path.toString()); + this.schema = schema; + this.blockSize = blockSize; + this.pageSize = pageSize; + this.enableDictionary = enableDictionary; + this.enableValidation = enableValidation; + this.version = version; + } + + public abstract void write(ParquetWriter writer) throws IOException; + + public abstract void test() throws IOException; + } + + public static void writeAndTest(WriteContext context) throws IOException { + // Create the configuration, and then apply the schema to our configuration. + Configuration configuration = new Configuration(); + GroupWriteSupport.setSchema(context.schema, configuration); + GroupWriteSupport groupWriteSupport = new GroupWriteSupport(); + + // Create the writer properties + final int blockSize = context.blockSize; + final int pageSize = context.pageSize; + final int dictionaryPageSize = pageSize; + final boolean enableDictionary = context.enableDictionary; + final boolean enableValidation = context.enableValidation; + ParquetProperties.WriterVersion writerVersion = context.version; + CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED; + + try (ParquetWriter writer = new ParquetWriter(context.fsPath, + groupWriteSupport, codec, blockSize, pageSize, dictionaryPageSize, + enableDictionary, enableValidation, writerVersion, configuration)) { + context.write(writer); + } + + context.test(); + + context.path.delete(); + } +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java index 191e397c14..9545aefc7d 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java @@ -33,7 +33,7 @@ public class RandomValues { private static final String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz1234567890"; - static abstract class RandomValueGenerator> implements Supplier { + public static abstract class RandomValueGenerator> implements Supplier { private final Random random; protected RandomValueGenerator(long seed) { @@ -94,7 +94,7 @@ public T get() { } } - static abstract class RandomBinaryBase> extends RandomValueGenerator { + public static abstract class RandomBinaryBase> extends RandomValueGenerator { protected final int bufferLength; protected final byte[] buffer; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestStatistics.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestStatistics.java index e5cd40f31b..9ec5560d41 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestStatistics.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestStatistics.java @@ -20,7 +20,6 @@ package org.apache.parquet.statistics; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.impl.ColumnReaderImpl; @@ -36,8 +35,6 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.example.GroupWriteSupport; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.PrimitiveConverter; @@ -72,60 +69,6 @@ public class TestStatistics { private static final int MEGABYTE = 1 << 20; private static final long RANDOM_SEED = 1441990701846L; //System.currentTimeMillis(); - public static class DataGenerationContext { - public static abstract class WriteContext { - protected final File path; - protected final Path fsPath; - protected final MessageType schema; - protected final int blockSize; - protected final int pageSize; - protected final boolean enableDictionary; - protected final boolean enableValidation; - protected final ParquetProperties.WriterVersion version; - - public WriteContext(File path, MessageType schema, int blockSize, int pageSize, boolean enableDictionary, boolean enableValidation, ParquetProperties.WriterVersion version) throws IOException { - this.path = path; - this.fsPath = new Path(path.toString()); - this.schema = schema; - this.blockSize = blockSize; - this.pageSize = pageSize; - this.enableDictionary = enableDictionary; - this.enableValidation = enableValidation; - this.version = version; - } - - public abstract void write(ParquetWriter writer) throws IOException; - public abstract void test() throws IOException; - } - - public static void writeAndTest(WriteContext context) throws IOException { - // Create the configuration, and then apply the schema to our configuration. - Configuration configuration = new Configuration(); - GroupWriteSupport.setSchema(context.schema, configuration); - GroupWriteSupport groupWriteSupport = new GroupWriteSupport(); - - // Create the writer properties - final int blockSize = context.blockSize; - final int pageSize = context.pageSize; - final int dictionaryPageSize = pageSize; - final boolean enableDictionary = context.enableDictionary; - final boolean enableValidation = context.enableValidation; - ParquetProperties.WriterVersion writerVersion = context.version; - CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED; - - ParquetWriter writer = new ParquetWriter(context.fsPath, - groupWriteSupport, codec, blockSize, pageSize, dictionaryPageSize, - enableDictionary, enableValidation, writerVersion, configuration); - - context.write(writer); - writer.close(); - - context.test(); - - context.path.delete(); - } - } - public static class SingletonPageReader implements PageReader { private final DictionaryPage dict; private final DataPage data; From ffd33faca1ca4e6f01dd83378ad6dce7e0876f05 Mon Sep 17 00:00:00 2001 From: Felix Schmalzel <7929762+fschmalzel@users.noreply.github.com> Date: Wed, 24 Feb 2021 10:58:44 +0100 Subject: [PATCH 2/4] PARQUET-1982: Better tests Add test for filtered random access Reads all pages of a row group Checks all columns of a page --- .../hadoop/TestParquetReaderRandomAccess.java | 309 +++++++++++------- 1 file changed, 194 insertions(+), 115 deletions(-) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java index d8a3e3ba2a..aefc2e2911 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java @@ -20,18 +20,22 @@ import org.apache.hadoop.conf.Configuration; import org.apache.parquet.ParquetReadOptions; -import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.ParquetProperties.WriterVersion; -import org.apache.parquet.column.page.DataPage; -import org.apache.parquet.column.page.DataPageV1; -import org.apache.parquet.column.page.DataPageV2; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.example.data.Group; import org.apache.parquet.example.data.simple.SimpleGroup; +import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; +import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.util.HadoopInputFile; -import org.apache.parquet.schema.*; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; import org.apache.parquet.statistics.DataGenerationContext; import org.apache.parquet.statistics.RandomValues; import org.junit.Rule; @@ -46,16 +50,28 @@ import java.util.List; import java.util.Random; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static org.apache.parquet.filter2.predicate.FilterApi.eq; +import static org.apache.parquet.filter2.predicate.FilterApi.longColumn; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; import static org.apache.parquet.schema.Type.Repetition.REQUIRED; import static org.junit.Assert.*; +/** + * This tests the random access methods of the ParquetFileReader, specifically: + *
    + *
  • {@link ParquetFileReader#readRowGroup(BlockMetaData)}
  • + *
  • {@link ParquetFileReader#readFilteredRowGroup(int)}
  • + *
+ * + * For this we use two columns. + * Column "i64" that starts at value 0 and counts up. + * Column "i64_flip" that start at value 1 and flips between 1 and 0. + * + * With these two column we can validate the read data without holding the written data in memory. + * The "i64_flip" column is mainly used to test the filtering. + * We filter "i64_flip" to be equal to one, that means all values in "i64" have to be even. + */ public class TestParquetReaderRandomAccess { private static final int KILOBYTE = 1 << 10; private static final long RANDOM_SEED = 7174252115631550700L; @@ -70,8 +86,8 @@ public void test() throws IOException { File file = temp.newFile("test_file.parquet"); file.delete(); - int blockSize = 500 * KILOBYTE; - int pageSize = 20 * KILOBYTE; + int blockSize = 50 * KILOBYTE; + int pageSize = 2 * KILOBYTE; List contexts = new ArrayList<>(); @@ -91,51 +107,56 @@ public void test() throws IOException { } } - public static abstract class DataContext extends DataGenerationContext.WriteContext { + public static class SequentialLongGenerator extends RandomValues.RandomValueGenerator { + private long value= 0; - private static final int recordCount = 100_000; + protected SequentialLongGenerator() { + super(0L); + } - private final Random random; + @Override + public Long nextValue() { + return value++; + } + } - private final List> randomGenerators; + public static class SequentialFlippingLongGenerator extends RandomValues.RandomValueGenerator { + private long value = 0; - protected final ColumnDescriptor testColumnDescriptor; + protected SequentialFlippingLongGenerator() { + super(0L); + } - public DataContext(long seed, File path, int blockSize, int pageSize, boolean enableDictionary, ParquetProperties.WriterVersion version) throws IOException { - super(path, buildSchema(seed), blockSize, pageSize, enableDictionary, true, version); + @Override + public Long nextValue() { + value = value == 0 ? 1 : 0; + return value; + } + } - this.random = new Random(seed); + public static abstract class DataContext extends DataGenerationContext.WriteContext { - int fixedLength = schema.getType("fixed-binary").asPrimitiveType().getTypeLength(); + private static final int recordCount = 1_000_000; - this.testColumnDescriptor = super.schema.getColumnDescription(new String[]{"unconstrained-i64"}); + private final List> randomGenerators; + private final Random random; + private final FilterCompat.Filter filter; - randomGenerators = Arrays.asList( - new RandomValues.IntGenerator(random.nextLong()), - new RandomValues.LongGenerator(random.nextLong()), - new RandomValues.FloatGenerator(random.nextLong()), - new RandomValues.DoubleGenerator(random.nextLong()), - new RandomValues.StringGenerator(random.nextLong()), - new RandomValues.FixedGenerator(random.nextLong(), fixedLength), - new RandomValues.UnconstrainedIntGenerator(random.nextLong()), - new RandomValues.UnconstrainedLongGenerator(random.nextLong()) - ); - } + public DataContext(long seed, File path, int blockSize, int pageSize, boolean enableDictionary, ParquetProperties.WriterVersion version) throws IOException { + super(path, buildSchema(), blockSize, pageSize, enableDictionary, true, version); - private static MessageType buildSchema(long seed) { - Random random = new Random(seed); - int fixedBinaryLength = random.nextInt(21) + 1; + this.random = new Random(seed); + this.randomGenerators = Arrays.asList( + new SequentialLongGenerator(), + new SequentialFlippingLongGenerator()); + this.filter = FilterCompat.get(eq(longColumn("i64_flip"), 1L)); + } + + private static MessageType buildSchema() { return new MessageType("schema", - new PrimitiveType(OPTIONAL, INT32, "i32"), - new PrimitiveType(OPTIONAL, INT64, "i64"), - new PrimitiveType(OPTIONAL, FLOAT, "sngl"), - new PrimitiveType(OPTIONAL, DOUBLE, "dbl"), - new PrimitiveType(OPTIONAL, BINARY, "strings"), - new PrimitiveType(OPTIONAL, FIXED_LEN_BYTE_ARRAY, fixedBinaryLength, "fixed-binary"), - new PrimitiveType(REQUIRED, INT32, "unconstrained-i32"), - new PrimitiveType(REQUIRED, INT64, "unconstrained-i64") - ); + new PrimitiveType(REQUIRED, INT64, "i64"), + new PrimitiveType(REQUIRED, INT64, "i64_flip")); } @Override @@ -149,79 +170,114 @@ public void write(ParquetWriter writer) throws IOException { if (type.isRepetition(OPTIONAL) && generator.shouldGenerateNull()) { continue; } - switch (type.asPrimitiveType().getPrimitiveTypeName()) { - case BINARY: - case FIXED_LEN_BYTE_ARRAY: - case INT96: - group.append(type.getName(), ((RandomValues.RandomBinaryBase) generator).nextBinaryValue()); - break; - case INT32: - group.append(type.getName(), (Integer) generator.nextValue()); - break; - case INT64: - group.append(type.getName(), (Long) generator.nextValue()); - break; - case FLOAT: - group.append(type.getName(), (Float) generator.nextValue()); - break; - case DOUBLE: - group.append(type.getName(), (Double) generator.nextValue()); - break; - case BOOLEAN: - group.append(type.getName(), (Boolean) generator.nextValue()); - break; - } + group.append(type.getName(), (Long) generator.nextValue()); } writer.write(group); } } - public static byte[] getBytesFromPage(DataPage page) throws IOException { - if (page instanceof DataPageV1) { - return ((DataPageV1) page).getBytes().toByteArray(); - } else if (page instanceof DataPageV2) { - return ((DataPageV2) page).getData().toByteArray(); - } else { - fail(); - return null; - } - } - @Override public void test() throws IOException { Configuration configuration = new Configuration(); ParquetReadOptions options = ParquetReadOptions.builder().build(); - List testPageBytes = new ArrayList<>(); - List testPageValueCounts = new ArrayList<>(); + ParquetReadOptions filterOptions = ParquetReadOptions.builder() + .copy(options) + .withRecordFilter(filter) + .useDictionaryFilter(true) + .useStatsFilter(true) + .useRecordFilter(true) + .useColumnIndexFilter(true) + .build(); + + List fromNumber = new ArrayList<>(); + List toNumber = new ArrayList<>(); + int blocks; try (ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(super.fsPath, configuration), options)) { + blocks = reader.getRowGroups().size(); PageReadStore pages; while ((pages = reader.readNextRowGroup()) != null) { - DataPage page = pages.getPageReader(testColumnDescriptor).readPage(); - testPageBytes.add(getBytesFromPage(page)); - testPageValueCounts.add(page.getValueCount()); + MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(super.schema); + RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(super.schema)); + long rowCount = pages.getRowCount(); + long from = recordReader.read().getLong("i64", 0); + for (int i = 1; i < rowCount - 1; i++) { + recordReader.read(); + } + Group group = recordReader.read(); + long to; + if (group == null) { + to = from; + } else { + to = group.getLong("i64", 0); + } + fromNumber.add(from); + toNumber.add(to); } } + // Randomize indexes + List indexes = new ArrayList<>(); + for (int i = 0; i < blocks; i++) { + for (int j = 0; j < 4; j++) { + indexes.add(i); + } + } + + Collections.shuffle(indexes, random); + try (ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(super.fsPath, configuration), options)) { - List blocks = reader.getRowGroups(); + test(reader, indexes, fromNumber, toNumber); + } - // Randomize indexes - List indexes = new ArrayList<>(); - for (int i = 0; i < blocks.size(); i++) { - for (int j = 0; j < 4; j++) { - indexes.add(i); - } - } + try (ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(super.fsPath, configuration), filterOptions)) { + testFiltered(reader, indexes, fromNumber, toNumber); + } + } - Collections.shuffle(indexes, random); + public void assertValues(PageReadStore pages, long firstValue, long lastValue) { + MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(super.schema); + RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(super.schema)); + for (long i = firstValue; i <= lastValue; i++) { + Group group = recordReader.read(); + assertEquals(i, group.getLong("i64", 0)); + assertEquals((i % 2) == 0 ? 1 : 0, group.getLong("i64_flip", 0)); + } + boolean exceptionThrown = false; + try { + recordReader.read(); + } catch (ParquetDecodingException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } - test(reader, indexes, testPageBytes, testPageValueCounts); + public void assertFilteredValues(PageReadStore pages, long firstValue, long lastValue) { + MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(super.schema); + RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(super.schema), filter); + + for (long i = firstValue; i <= lastValue; i++) { + Group group = recordReader.read(); + if ((i % 2) == 0) { + assertEquals(i, group.getLong("i64", 0)); + assertEquals(1, group.getLong("i64_flip", 0)); + } else { + assertTrue(group == null || recordReader.shouldSkipCurrentRecord()); + } } + + boolean exceptionThrown = false; + try { + recordReader.read(); + } catch (ParquetDecodingException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); } - protected abstract void test(ParquetFileReader reader, List indexes, List idPageBytes, List idPageValueCounts) throws IOException; + protected abstract void test(ParquetFileReader reader, List indexes, List fromNumber, List toNumber) throws IOException; + protected abstract void testFiltered(ParquetFileReader reader, List indexes, List fromNumber, List toNumber) throws IOException; } public static class DataContextRandom extends DataContext { @@ -231,14 +287,19 @@ public DataContextRandom(long seed, File path, int blockSize, int pageSize, bool } @Override - protected void test(ParquetFileReader reader, List indexes, List testPageBytes, List testPageValueCounts) throws IOException { + protected void test(ParquetFileReader reader, List indexes, List fromNumber, List toNumber) throws IOException { List blocks = reader.getRowGroups(); - for (int index: indexes) { PageReadStore pages = reader.readRowGroup(blocks.get(index)); - DataPage page = pages.getPageReader(testColumnDescriptor).readPage(); - assertArrayEquals(testPageBytes.get(index), getBytesFromPage(page)); - assertEquals((int) testPageValueCounts.get(index), page.getValueCount()); + assertValues(pages, fromNumber.get(index), toNumber.get(index)); + } + } + + @Override + protected void testFiltered(ParquetFileReader reader, List indexes, List fromNumber, List toNumber) throws IOException { + for (int index: indexes) { + PageReadStore pages = reader.readFilteredRowGroup(index); + assertFilteredValues(pages, fromNumber.get(index), toNumber.get(index)); } } } @@ -250,41 +311,59 @@ public DataContextRandomAndSequential(long seed, File path, int blockSize, int p } @Override - protected void test(ParquetFileReader reader, List indexes, List testPageBytes, List testPageValueCounts) throws IOException { + protected void test(ParquetFileReader reader, List indexes, List fromNumber, List toNumber) throws IOException { List blocks = reader.getRowGroups(); int splitPoint = indexes.size()/2; { PageReadStore pages = reader.readNextRowGroup(); - DataPage page = pages.getPageReader(testColumnDescriptor).readPage(); - assertArrayEquals(testPageBytes.get(0), getBytesFromPage(page)); - assertEquals((int) testPageValueCounts.get(0), page.getValueCount()); + assertValues(pages, fromNumber.get(0), toNumber.get(0)); } for (int i = 0; i < splitPoint; i++) { int index = indexes.get(i); PageReadStore pages = reader.readRowGroup(blocks.get(index)); - DataPage page = pages.getPageReader(testColumnDescriptor).readPage(); - assertArrayEquals(testPageBytes.get(index), getBytesFromPage(page)); - assertEquals((int) testPageValueCounts.get(index), page.getValueCount()); + assertValues(pages, fromNumber.get(index), toNumber.get(index)); } { PageReadStore pages = reader.readNextRowGroup(); - DataPage page = pages.getPageReader(testColumnDescriptor).readPage(); - assertArrayEquals(testPageBytes.get(1), getBytesFromPage(page)); - assertEquals((int) testPageValueCounts.get(1), page.getValueCount()); + assertValues(pages, fromNumber.get(1), toNumber.get(1)); } for (int i = splitPoint; i < indexes.size(); i++) { int index = indexes.get(i); PageReadStore pages = reader.readRowGroup(blocks.get(index)); - DataPage page = pages.getPageReader(testColumnDescriptor).readPage(); - assertArrayEquals(testPageBytes.get(index), getBytesFromPage(page)); - assertEquals((int) testPageValueCounts.get(index), page.getValueCount()); + assertValues(pages, fromNumber.get(index), toNumber.get(index)); } { PageReadStore pages = reader.readNextRowGroup(); - DataPage page = pages.getPageReader(testColumnDescriptor).readPage(); - assertArrayEquals(testPageBytes.get(2), getBytesFromPage(page)); - assertEquals((int) testPageValueCounts.get(2), page.getValueCount()); + assertValues(pages, fromNumber.get(2), toNumber.get(2)); + } + } + + @Override + protected void testFiltered(ParquetFileReader reader, List indexes, List fromNumber, List toNumber) throws IOException { + int splitPoint = indexes.size()/2; + + { + PageReadStore pages = reader.readNextFilteredRowGroup(); + assertFilteredValues(pages, fromNumber.get(0), toNumber.get(0)); + } + for (int i = 0; i < splitPoint; i++) { + int index = indexes.get(i); + PageReadStore pages = reader.readFilteredRowGroup(index); + assertFilteredValues(pages, fromNumber.get(index), toNumber.get(index)); + } + { + PageReadStore pages = reader.readNextFilteredRowGroup(); + assertFilteredValues(pages, fromNumber.get(1), toNumber.get(1)); + } + for (int i = splitPoint; i < indexes.size(); i++) { + int index = indexes.get(i); + PageReadStore pages = reader.readFilteredRowGroup(index); + assertFilteredValues(pages, fromNumber.get(index), toNumber.get(index)); + } + { + PageReadStore pages = reader.readNextFilteredRowGroup(); + assertFilteredValues(pages, fromNumber.get(2), toNumber.get(2)); } } } From ff1303ae392939f35631bc107c1242be6cded017 Mon Sep 17 00:00:00 2001 From: Felix Schmalzel <7929762+fschmalzel@users.noreply.github.com> Date: Wed, 24 Feb 2021 11:32:36 +0100 Subject: [PATCH 3/4] PARQUET-1982: Accept int instead of BlockMetaData --- .../parquet/hadoop/ParquetFileReader.java | 43 +++++++++++++------ .../hadoop/TestParquetReaderRandomAccess.java | 11 ++--- 2 files changed, 34 insertions(+), 20 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index f6ab8cbe55..7edec333fc 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -891,12 +891,12 @@ public void appendTo(ParquetFileWriter writer) throws IOException { /** * Reads all the columns requested from the row group at the specified block. * - * @param block the metadata for the requested RowGroup + * @param blockIndex the index of the requested block * @throws IOException if an error occurs while reading * @return the PageReadStore which can provide PageReaders for each column. */ - public PageReadStore readRowGroup(BlockMetaData block) throws IOException { - return internalReadRowGroup(block); + public PageReadStore readRowGroup(int blockIndex) throws IOException { + return internalReadRowGroup(blockIndex); } /** @@ -905,11 +905,11 @@ public PageReadStore readRowGroup(BlockMetaData block) throws IOException { * @return the PageReadStore which can provide PageReaders for each column. */ public PageReadStore readNextRowGroup() throws IOException { - if (currentBlock >= blocks.size()) { + ColumnChunkPageReadStore rowGroup = internalReadRowGroup(currentBlock); + if (rowGroup == null) { return null; } - this.currentRowGroup = internalReadRowGroup(blocks.get(currentBlock)); - + this.currentRowGroup = rowGroup; // avoid re-reading bytes the dictionary reader is used after this call if (nextDictionaryReader != null) { nextDictionaryReader.setRowGroup(currentRowGroup); @@ -920,10 +920,11 @@ public PageReadStore readNextRowGroup() throws IOException { return currentRowGroup; } - private ColumnChunkPageReadStore internalReadRowGroup(BlockMetaData block) throws IOException { - if (block == null) { + private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOException { + if (blockIndex < 0 || blockIndex >= blocks.size()) { return null; } + BlockMetaData block = blocks.get(blockIndex); if (block.getRowCount() == 0) { throw new RuntimeException("Illegal row group of 0 rows"); } @@ -970,25 +971,27 @@ public PageReadStore readFilteredRowGroup(int blockIndex) throws IOException { if (blockIndex < 0 || blockIndex >= blocks.size()) { return null; } - BlockMetaData block = blocks.get(blockIndex); // Filtering not required -> fall back to the non-filtering path if (!options.useColumnIndexFilter() || !FilterCompat.isFilteringRequired(options.getRecordFilter())) { - return internalReadRowGroup(block); + return internalReadRowGroup(blockIndex); } + BlockMetaData block = blocks.get(blockIndex); if (block.getRowCount() == 0) { throw new RuntimeException("Illegal row group of 0 rows"); } + RowRanges rowRanges = getRowRanges(blockIndex); long rowCount = rowRanges.rowCount(); if (rowCount == 0) { // There are no matching rows -> returning null return null; } + if (rowCount == block.getRowCount()) { // All rows are matching -> fall back to the non-filtering path - return internalReadRowGroup(block); + return internalReadRowGroup(blockIndex); } return internalReadFilteredRowGroup(block, rowRanges, getColumnIndexStore(blockIndex)); @@ -1141,12 +1144,19 @@ private boolean advanceToNextBlock() { * @return a DictionaryPageReadStore for the next row group */ public DictionaryPageReadStore getNextDictionaryReader() { - if (nextDictionaryReader == null && currentBlock < blocks.size()) { - this.nextDictionaryReader = getDictionaryReader(blocks.get(currentBlock)); + if (nextDictionaryReader == null) { + this.nextDictionaryReader = getDictionaryReader(currentBlock); } return nextDictionaryReader; } + public DictionaryPageReader getDictionaryReader(int blockIndex) { + if (blockIndex < 0 || blockIndex >= blocks.size()) { + return null; + } + return new DictionaryPageReader(this, blocks.get(blockIndex)); + } + public DictionaryPageReader getDictionaryReader(BlockMetaData block) { return new DictionaryPageReader(this, block); } @@ -1228,6 +1238,13 @@ private DictionaryPage readCompressedDictionary( converter.getEncoding(dictHeader.getEncoding())); } + public BloomFilterReader getBloomFilterDataReader(int blockIndex) { + if (blockIndex < 0 || blockIndex >= blocks.size()) { + return null; + } + return new BloomFilterReader(this, blocks.get(blockIndex)); + } + public BloomFilterReader getBloomFilterDataReader(BlockMetaData block) { return new BloomFilterReader(this, block); } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java index aefc2e2911..65c23226da 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java @@ -27,7 +27,6 @@ import org.apache.parquet.example.data.simple.SimpleGroup; import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; import org.apache.parquet.filter2.compat.FilterCompat; -import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.io.ColumnIOFactory; import org.apache.parquet.io.MessageColumnIO; @@ -60,7 +59,7 @@ /** * This tests the random access methods of the ParquetFileReader, specifically: *
    - *
  • {@link ParquetFileReader#readRowGroup(BlockMetaData)}
  • + *
  • {@link ParquetFileReader#readRowGroup(int)}
  • *
  • {@link ParquetFileReader#readFilteredRowGroup(int)}
  • *
* @@ -288,9 +287,8 @@ public DataContextRandom(long seed, File path, int blockSize, int pageSize, bool @Override protected void test(ParquetFileReader reader, List indexes, List fromNumber, List toNumber) throws IOException { - List blocks = reader.getRowGroups(); for (int index: indexes) { - PageReadStore pages = reader.readRowGroup(blocks.get(index)); + PageReadStore pages = reader.readRowGroup(index); assertValues(pages, fromNumber.get(index), toNumber.get(index)); } } @@ -312,7 +310,6 @@ public DataContextRandomAndSequential(long seed, File path, int blockSize, int p @Override protected void test(ParquetFileReader reader, List indexes, List fromNumber, List toNumber) throws IOException { - List blocks = reader.getRowGroups(); int splitPoint = indexes.size()/2; { @@ -321,7 +318,7 @@ protected void test(ParquetFileReader reader, List indexes, List } for (int i = 0; i < splitPoint; i++) { int index = indexes.get(i); - PageReadStore pages = reader.readRowGroup(blocks.get(index)); + PageReadStore pages = reader.readRowGroup(index); assertValues(pages, fromNumber.get(index), toNumber.get(index)); } { @@ -330,7 +327,7 @@ protected void test(ParquetFileReader reader, List indexes, List } for (int i = splitPoint; i < indexes.size(); i++) { int index = indexes.get(i); - PageReadStore pages = reader.readRowGroup(blocks.get(index)); + PageReadStore pages = reader.readRowGroup(index); assertValues(pages, fromNumber.get(index), toNumber.get(index)); } { From 26fb7210fee1ce2b2f62ed0ccea5d4f5c3f97ba7 Mon Sep 17 00:00:00 2001 From: Felix Schmalzel <7929762+fschmalzel@users.noreply.github.com> Date: Wed, 24 Feb 2021 11:54:02 +0100 Subject: [PATCH 4/4] PARQUET-1982: Test invalid indexes --- .../hadoop/TestParquetReaderRandomAccess.java | 71 ++++++++++++------- 1 file changed, 45 insertions(+), 26 deletions(-) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java index 65c23226da..c2b5986979 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java @@ -191,10 +191,10 @@ public void test() throws IOException { List fromNumber = new ArrayList<>(); List toNumber = new ArrayList<>(); - int blocks; + int blockAmount; try (ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(super.fsPath, configuration), options)) { - blocks = reader.getRowGroups().size(); + blockAmount = reader.getRowGroups().size(); PageReadStore pages; while ((pages = reader.readNextRowGroup()) != null) { MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(super.schema); @@ -218,24 +218,35 @@ public void test() throws IOException { // Randomize indexes List indexes = new ArrayList<>(); - for (int i = 0; i < blocks; i++) { - for (int j = 0; j < 4; j++) { + for (int j = 0; j < 4; j++) { + for (int i = 0; i < blockAmount; i++) { indexes.add(i); } + indexes.add(-1); + indexes.add(blockAmount); + indexes.add(blockAmount + 1); } Collections.shuffle(indexes, random); try (ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(super.fsPath, configuration), options)) { - test(reader, indexes, fromNumber, toNumber); + test(reader, indexes, fromNumber, toNumber, blockAmount); } try (ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(super.fsPath, configuration), filterOptions)) { - testFiltered(reader, indexes, fromNumber, toNumber); + testFiltered(reader, indexes, fromNumber, toNumber, blockAmount); } } - public void assertValues(PageReadStore pages, long firstValue, long lastValue) { + public void assertValues(PageReadStore pages, List fromNumber, List toNumber, int index, int blockAmount) { + if (index < 0 || index >= blockAmount) { + assertNull(pages); + return; + } + + long firstValue = fromNumber.get(index); + long lastValue = toNumber.get(index); + MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(super.schema); RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(super.schema)); for (long i = firstValue; i <= lastValue; i++) { @@ -252,7 +263,15 @@ public void assertValues(PageReadStore pages, long firstValue, long lastValue) { assertTrue(exceptionThrown); } - public void assertFilteredValues(PageReadStore pages, long firstValue, long lastValue) { + public void assertFilteredValues(PageReadStore pages, List fromNumber, List toNumber, int index, int blockAmount) { + if (index < 0 || index >= blockAmount) { + assertNull(pages); + return; + } + + long firstValue = fromNumber.get(index); + long lastValue = toNumber.get(index); + MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(super.schema); RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(super.schema), filter); @@ -275,8 +294,8 @@ public void assertFilteredValues(PageReadStore pages, long firstValue, long last assertTrue(exceptionThrown); } - protected abstract void test(ParquetFileReader reader, List indexes, List fromNumber, List toNumber) throws IOException; - protected abstract void testFiltered(ParquetFileReader reader, List indexes, List fromNumber, List toNumber) throws IOException; + protected abstract void test(ParquetFileReader reader, List indexes, List fromNumber, List toNumber, int blockAmount) throws IOException; + protected abstract void testFiltered(ParquetFileReader reader, List indexes, List fromNumber, List toNumber, int blockAmount) throws IOException; } public static class DataContextRandom extends DataContext { @@ -286,18 +305,18 @@ public DataContextRandom(long seed, File path, int blockSize, int pageSize, bool } @Override - protected void test(ParquetFileReader reader, List indexes, List fromNumber, List toNumber) throws IOException { + protected void test(ParquetFileReader reader, List indexes, List fromNumber, List toNumber, int blockAmount) throws IOException { for (int index: indexes) { PageReadStore pages = reader.readRowGroup(index); - assertValues(pages, fromNumber.get(index), toNumber.get(index)); + assertValues(pages, fromNumber, toNumber, index, blockAmount); } } @Override - protected void testFiltered(ParquetFileReader reader, List indexes, List fromNumber, List toNumber) throws IOException { + protected void testFiltered(ParquetFileReader reader, List indexes, List fromNumber, List toNumber, int blockAmount) throws IOException { for (int index: indexes) { PageReadStore pages = reader.readFilteredRowGroup(index); - assertFilteredValues(pages, fromNumber.get(index), toNumber.get(index)); + assertFilteredValues(pages, fromNumber, toNumber, index, blockAmount); } } } @@ -309,58 +328,58 @@ public DataContextRandomAndSequential(long seed, File path, int blockSize, int p } @Override - protected void test(ParquetFileReader reader, List indexes, List fromNumber, List toNumber) throws IOException { + protected void test(ParquetFileReader reader, List indexes, List fromNumber, List toNumber, int blockAmount) throws IOException { int splitPoint = indexes.size()/2; { PageReadStore pages = reader.readNextRowGroup(); - assertValues(pages, fromNumber.get(0), toNumber.get(0)); + assertValues(pages, fromNumber, toNumber, 0 , blockAmount); } for (int i = 0; i < splitPoint; i++) { int index = indexes.get(i); PageReadStore pages = reader.readRowGroup(index); - assertValues(pages, fromNumber.get(index), toNumber.get(index)); + assertValues(pages, fromNumber, toNumber, index, blockAmount); } { PageReadStore pages = reader.readNextRowGroup(); - assertValues(pages, fromNumber.get(1), toNumber.get(1)); + assertValues(pages, fromNumber, toNumber, 1 , blockAmount); } for (int i = splitPoint; i < indexes.size(); i++) { int index = indexes.get(i); PageReadStore pages = reader.readRowGroup(index); - assertValues(pages, fromNumber.get(index), toNumber.get(index)); + assertValues(pages, fromNumber, toNumber, index, blockAmount); } { PageReadStore pages = reader.readNextRowGroup(); - assertValues(pages, fromNumber.get(2), toNumber.get(2)); + assertValues(pages, fromNumber, toNumber, 2 , blockAmount); } } @Override - protected void testFiltered(ParquetFileReader reader, List indexes, List fromNumber, List toNumber) throws IOException { + protected void testFiltered(ParquetFileReader reader, List indexes, List fromNumber, List toNumber, int blockAmount) throws IOException { int splitPoint = indexes.size()/2; { PageReadStore pages = reader.readNextFilteredRowGroup(); - assertFilteredValues(pages, fromNumber.get(0), toNumber.get(0)); + assertFilteredValues(pages, fromNumber, toNumber, 0, blockAmount); } for (int i = 0; i < splitPoint; i++) { int index = indexes.get(i); PageReadStore pages = reader.readFilteredRowGroup(index); - assertFilteredValues(pages, fromNumber.get(index), toNumber.get(index)); + assertFilteredValues(pages, fromNumber, toNumber, index, blockAmount); } { PageReadStore pages = reader.readNextFilteredRowGroup(); - assertFilteredValues(pages, fromNumber.get(1), toNumber.get(1)); + assertFilteredValues(pages, fromNumber, toNumber, 1, blockAmount); } for (int i = splitPoint; i < indexes.size(); i++) { int index = indexes.get(i); PageReadStore pages = reader.readFilteredRowGroup(index); - assertFilteredValues(pages, fromNumber.get(index), toNumber.get(index)); + assertFilteredValues(pages, fromNumber, toNumber, index, blockAmount); } { PageReadStore pages = reader.readNextFilteredRowGroup(); - assertFilteredValues(pages, fromNumber.get(2), toNumber.get(2)); + assertFilteredValues(pages, fromNumber, toNumber, 2, blockAmount); } } }