diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 791f9ef188..7edec333fc 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -888,20 +888,47 @@ public void appendTo(ParquetFileWriter writer) throws IOException { writer.appendRowGroups(f, blocks, true); } + /** + * Reads all the columns requested from the row group at the specified block. + * + * @param blockIndex the index of the requested block + * @throws IOException if an error occurs while reading + * @return the PageReadStore which can provide PageReaders for each column. + */ + public PageReadStore readRowGroup(int blockIndex) throws IOException { + return internalReadRowGroup(blockIndex); + } + /** * Reads all the columns requested from the row group at the current file position. * @throws IOException if an error occurs while reading * @return the PageReadStore which can provide PageReaders for each column. */ public PageReadStore readNextRowGroup() throws IOException { - if (currentBlock == blocks.size()) { + ColumnChunkPageReadStore rowGroup = internalReadRowGroup(currentBlock); + if (rowGroup == null) { return null; } - BlockMetaData block = blocks.get(currentBlock); + this.currentRowGroup = rowGroup; + // avoid re-reading bytes the dictionary reader is used after this call + if (nextDictionaryReader != null) { + nextDictionaryReader.setRowGroup(currentRowGroup); + } + + advanceToNextBlock(); + + return currentRowGroup; + } + + private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOException { + if (blockIndex < 0 || blockIndex >= blocks.size()) { + return null; + } + BlockMetaData block = blocks.get(blockIndex); if (block.getRowCount() == 0) { throw new RuntimeException("Illegal row group of 0 rows"); } - this.currentRowGroup = new ColumnChunkPageReadStore(block.getRowCount()); + ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(block.getRowCount()); // prepare the list of consecutive parts to read them in one scan List allParts = new ArrayList(); ConsecutivePartList currentParts = null; @@ -920,22 +947,54 @@ public PageReadStore readNextRowGroup() throws IOException { } } // actually read all the chunks - ChunkListBuilder builder = new ChunkListBuilder(); + ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount()); for (ConsecutivePartList consecutiveChunks : allParts) { consecutiveChunks.readAll(f, builder); } for (Chunk chunk : builder.build()) { - readChunkPages(chunk, block); + readChunkPages(chunk, block, rowGroup); } - // avoid re-reading bytes the dictionary reader is used after this call - if (nextDictionaryReader != null) { - nextDictionaryReader.setRowGroup(currentRowGroup); + return rowGroup; + } + + /** + * Reads all the columns requested from the specified row group. It may skip specific pages based on the column + * indexes according to the actual filter. As the rows are not aligned among the pages of the different columns row + * synchronization might be required. See the documentation of the class SynchronizingColumnReader for details. + * + * @param blockIndex the index of the requested block + * @return the PageReadStore which can provide PageReaders for each column or null if there are no rows in this block + * @throws IOException if an error occurs while reading + */ + public PageReadStore readFilteredRowGroup(int blockIndex) throws IOException { + if (blockIndex < 0 || blockIndex >= blocks.size()) { + return null; } - advanceToNextBlock(); + // Filtering not required -> fall back to the non-filtering path + if (!options.useColumnIndexFilter() || !FilterCompat.isFilteringRequired(options.getRecordFilter())) { + return internalReadRowGroup(blockIndex); + } - return currentRowGroup; + BlockMetaData block = blocks.get(blockIndex); + if (block.getRowCount() == 0) { + throw new RuntimeException("Illegal row group of 0 rows"); + } + + RowRanges rowRanges = getRowRanges(blockIndex); + long rowCount = rowRanges.rowCount(); + if (rowCount == 0) { + // There are no matching rows -> returning null + return null; + } + + if (rowCount == block.getRowCount()) { + // All rows are matching -> fall back to the non-filtering path + return internalReadRowGroup(blockIndex); + } + + return internalReadFilteredRowGroup(block, rowRanges, getColumnIndexStore(blockIndex)); } /** @@ -945,13 +1004,13 @@ public PageReadStore readNextRowGroup() throws IOException { * details. * * @return the PageReadStore which can provide PageReaders for each column - * @throws IOException - * if any I/O error occurs while reading + * @throws IOException if an error occurs while reading */ public PageReadStore readNextFilteredRowGroup() throws IOException { if (currentBlock == blocks.size()) { return null; } + // Filtering not required -> fall back to the non-filtering path if (!options.useColumnIndexFilter() || !FilterCompat.isFilteringRequired(options.getRecordFilter())) { return readNextRowGroup(); } @@ -959,7 +1018,6 @@ public PageReadStore readNextFilteredRowGroup() throws IOException { if (block.getRowCount() == 0) { throw new RuntimeException("Illegal row group of 0 rows"); } - ColumnIndexStore ciStore = getColumnIndexStore(currentBlock); RowRanges rowRanges = getRowRanges(currentBlock); long rowCount = rowRanges.rowCount(); if (rowCount == 0) { @@ -972,9 +1030,22 @@ public PageReadStore readNextFilteredRowGroup() throws IOException { return readNextRowGroup(); } - this.currentRowGroup = new ColumnChunkPageReadStore(rowRanges); + this.currentRowGroup = internalReadFilteredRowGroup(block, rowRanges, getColumnIndexStore(currentBlock)); + + // avoid re-reading bytes the dictionary reader is used after this call + if (nextDictionaryReader != null) { + nextDictionaryReader.setRowGroup(currentRowGroup); + } + + advanceToNextBlock(); + + return this.currentRowGroup; + } + + private ColumnChunkPageReadStore internalReadFilteredRowGroup(BlockMetaData block, RowRanges rowRanges, ColumnIndexStore ciStore) throws IOException { + ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(rowRanges); // prepare the list of consecutive parts to read them in one scan - ChunkListBuilder builder = new ChunkListBuilder(); + ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount()); List allParts = new ArrayList(); ConsecutivePartList currentParts = null; for (ColumnChunkMetaData mc : block.getColumns()) { @@ -1005,31 +1076,24 @@ public PageReadStore readNextFilteredRowGroup() throws IOException { consecutiveChunks.readAll(f, builder); } for (Chunk chunk : builder.build()) { - readChunkPages(chunk, block); - } - - // avoid re-reading bytes the dictionary reader is used after this call - if (nextDictionaryReader != null) { - nextDictionaryReader.setRowGroup(currentRowGroup); + readChunkPages(chunk, block, rowGroup); } - advanceToNextBlock(); - - return currentRowGroup; + return rowGroup; } - private void readChunkPages(Chunk chunk, BlockMetaData block) throws IOException { + private void readChunkPages(Chunk chunk, BlockMetaData block, ColumnChunkPageReadStore rowGroup) throws IOException { if (null == fileDecryptor || fileDecryptor.plaintextFile()) { - currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages()); + rowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages()); return; } // Encrypted file ColumnPath columnPath = ColumnPath.get(chunk.descriptor.col.getPath()); InternalColumnDecryptionSetup columnDecryptionSetup = fileDecryptor.getColumnSetup(columnPath); if (!columnDecryptionSetup.isEncrypted()) { // plaintext column - currentRowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages()); + rowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages()); } else { // encrypted column - currentRowGroup.addColumn(chunk.descriptor.col, + rowGroup.addColumn(chunk.descriptor.col, chunk.readAllPages(columnDecryptionSetup.getMetaDataDecryptor(), columnDecryptionSetup.getDataDecryptor(), fileDecryptor.getFileAAD(), block.getOrdinal(), columnDecryptionSetup.getOrdinal())); } @@ -1080,12 +1144,19 @@ private boolean advanceToNextBlock() { * @return a DictionaryPageReadStore for the next row group */ public DictionaryPageReadStore getNextDictionaryReader() { - if (nextDictionaryReader == null && currentBlock < blocks.size()) { - this.nextDictionaryReader = getDictionaryReader(blocks.get(currentBlock)); + if (nextDictionaryReader == null) { + this.nextDictionaryReader = getDictionaryReader(currentBlock); } return nextDictionaryReader; } + public DictionaryPageReader getDictionaryReader(int blockIndex) { + if (blockIndex < 0 || blockIndex >= blocks.size()) { + return null; + } + return new DictionaryPageReader(this, blocks.get(blockIndex)); + } + public DictionaryPageReader getDictionaryReader(BlockMetaData block) { return new DictionaryPageReader(this, block); } @@ -1167,6 +1238,13 @@ private DictionaryPage readCompressedDictionary( converter.getEncoding(dictHeader.getEncoding())); } + public BloomFilterReader getBloomFilterDataReader(int blockIndex) { + if (blockIndex < 0 || blockIndex >= blocks.size()) { + return null; + } + return new BloomFilterReader(this, blocks.get(blockIndex)); + } + public BloomFilterReader getBloomFilterDataReader(BlockMetaData block) { return new BloomFilterReader(this, block); } @@ -1315,8 +1393,13 @@ private class ChunkData { private final Map map = new HashMap<>(); private ChunkDescriptor lastDescriptor; + private final long rowCount; private SeekableInputStream f; + public ChunkListBuilder(long rowCount) { + this.rowCount = rowCount; + } + void add(ChunkDescriptor descriptor, List buffers, SeekableInputStream f) { ChunkData data = map.get(descriptor); if (data == null) { @@ -1345,9 +1428,9 @@ List build() { ChunkData data = entry.getValue(); if (descriptor.equals(lastDescriptor)) { // because of a bug, the last chunk might be larger than descriptor.size - chunks.add(new WorkaroundChunk(lastDescriptor, data.buffers, f, data.offsetIndex)); + chunks.add(new WorkaroundChunk(lastDescriptor, data.buffers, f, data.offsetIndex, rowCount)); } else { - chunks.add(new Chunk(descriptor, data.buffers, data.offsetIndex)); + chunks.add(new Chunk(descriptor, data.buffers, data.offsetIndex, rowCount)); } } return chunks; @@ -1362,16 +1445,18 @@ private class Chunk { protected final ChunkDescriptor descriptor; protected final ByteBufferInputStream stream; final OffsetIndex offsetIndex; + final long rowCount; /** * @param descriptor descriptor for the chunk * @param buffers ByteBuffers that contain the chunk * @param offsetIndex the offset index for this column; might be null */ - public Chunk(ChunkDescriptor descriptor, List buffers, OffsetIndex offsetIndex) { + public Chunk(ChunkDescriptor descriptor, List buffers, OffsetIndex offsetIndex, long rowCount) { this.descriptor = descriptor; this.stream = ByteBufferInputStream.wrap(buffers); this.offsetIndex = offsetIndex; + this.rowCount = rowCount; } protected PageHeader readPageHeader() throws IOException { @@ -1518,7 +1603,7 @@ public ColumnChunkPageReader readAllPages(BlockCipher.Decryptor headerBlockDecry } BytesInputDecompressor decompressor = options.getCodecFactory().getDecompressor(descriptor.metadata.getCodec()); return new ColumnChunkPageReader(decompressor, pagesInChunk, dictionaryPage, offsetIndex, - blocks.get(currentBlock).getRowCount(), pageBlockDecryptor, aadPrefix, rowGroupOrdinal, columnOrdinal); + rowCount, pageBlockDecryptor, aadPrefix, rowGroupOrdinal, columnOrdinal); } private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoFar) { @@ -1556,8 +1641,8 @@ private class WorkaroundChunk extends Chunk { * @param descriptor the descriptor of the chunk * @param f the file stream positioned at the end of this chunk */ - private WorkaroundChunk(ChunkDescriptor descriptor, List buffers, SeekableInputStream f, OffsetIndex offsetIndex) { - super(descriptor, buffers, offsetIndex); + private WorkaroundChunk(ChunkDescriptor descriptor, List buffers, SeekableInputStream f, OffsetIndex offsetIndex, long rowCount) { + super(descriptor, buffers, offsetIndex, rowCount); this.f = f; } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java new file mode 100644 index 0000000000..c2b5986979 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReaderRandomAccess.java @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.ParquetReadOptions; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.column.ParquetProperties.WriterVersion; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.example.data.simple.SimpleGroup; +import org.apache.parquet.example.data.simple.convert.GroupRecordConverter; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.hadoop.util.HadoopInputFile; +import org.apache.parquet.io.ColumnIOFactory; +import org.apache.parquet.io.MessageColumnIO; +import org.apache.parquet.io.ParquetDecodingException; +import org.apache.parquet.io.RecordReader; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.parquet.statistics.DataGenerationContext; +import org.apache.parquet.statistics.RandomValues; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import static org.apache.parquet.filter2.predicate.FilterApi.eq; +import static org.apache.parquet.filter2.predicate.FilterApi.longColumn; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static org.apache.parquet.schema.Type.Repetition.OPTIONAL; +import static org.apache.parquet.schema.Type.Repetition.REQUIRED; +import static org.junit.Assert.*; + +/** + * This tests the random access methods of the ParquetFileReader, specifically: + *
    + *
  • {@link ParquetFileReader#readRowGroup(int)}
  • + *
  • {@link ParquetFileReader#readFilteredRowGroup(int)}
  • + *
+ * + * For this we use two columns. + * Column "i64" that starts at value 0 and counts up. + * Column "i64_flip" that start at value 1 and flips between 1 and 0. + * + * With these two column we can validate the read data without holding the written data in memory. + * The "i64_flip" column is mainly used to test the filtering. + * We filter "i64_flip" to be equal to one, that means all values in "i64" have to be even. + */ +public class TestParquetReaderRandomAccess { + private static final int KILOBYTE = 1 << 10; + private static final long RANDOM_SEED = 7174252115631550700L; + + @Rule + public final TemporaryFolder temp = new TemporaryFolder(); + + @Test + public void test() throws IOException { + Random random = new Random(RANDOM_SEED); + + File file = temp.newFile("test_file.parquet"); + file.delete(); + + int blockSize = 50 * KILOBYTE; + int pageSize = 2 * KILOBYTE; + + List contexts = new ArrayList<>(); + + for (boolean enableDictionary : new boolean[]{false, true}) { + for (WriterVersion writerVersion : new WriterVersion[]{WriterVersion.PARQUET_1_0, WriterVersion.PARQUET_2_0}) { + contexts.add( + new DataContextRandom(random.nextLong(), file, blockSize, + pageSize, enableDictionary, writerVersion)); + contexts.add( + new DataContextRandomAndSequential(random.nextLong(), file, blockSize, + pageSize, enableDictionary, writerVersion)); + } + } + + for (DataContext context : contexts) { + DataGenerationContext.writeAndTest(context); + } + } + + public static class SequentialLongGenerator extends RandomValues.RandomValueGenerator { + private long value= 0; + + protected SequentialLongGenerator() { + super(0L); + } + + @Override + public Long nextValue() { + return value++; + } + } + + public static class SequentialFlippingLongGenerator extends RandomValues.RandomValueGenerator { + private long value = 0; + + protected SequentialFlippingLongGenerator() { + super(0L); + } + + @Override + public Long nextValue() { + value = value == 0 ? 1 : 0; + return value; + } + } + + public static abstract class DataContext extends DataGenerationContext.WriteContext { + + private static final int recordCount = 1_000_000; + + private final List> randomGenerators; + private final Random random; + private final FilterCompat.Filter filter; + + public DataContext(long seed, File path, int blockSize, int pageSize, boolean enableDictionary, ParquetProperties.WriterVersion version) throws IOException { + super(path, buildSchema(), blockSize, pageSize, enableDictionary, true, version); + + this.random = new Random(seed); + this.randomGenerators = Arrays.asList( + new SequentialLongGenerator(), + new SequentialFlippingLongGenerator()); + + this.filter = FilterCompat.get(eq(longColumn("i64_flip"), 1L)); + } + + private static MessageType buildSchema() { + return new MessageType("schema", + new PrimitiveType(REQUIRED, INT64, "i64"), + new PrimitiveType(REQUIRED, INT64, "i64_flip")); + } + + @Override + public void write(ParquetWriter writer) throws IOException { + for (int index = 0; index < recordCount; index++) { + Group group = new SimpleGroup(super.schema); + + for (int column = 0, columnCnt = schema.getFieldCount(); column < columnCnt; ++column) { + Type type = schema.getType(column); + RandomValues.RandomValueGenerator generator = randomGenerators.get(column); + if (type.isRepetition(OPTIONAL) && generator.shouldGenerateNull()) { + continue; + } + group.append(type.getName(), (Long) generator.nextValue()); + } + writer.write(group); + } + } + + @Override + public void test() throws IOException { + Configuration configuration = new Configuration(); + ParquetReadOptions options = ParquetReadOptions.builder().build(); + + ParquetReadOptions filterOptions = ParquetReadOptions.builder() + .copy(options) + .withRecordFilter(filter) + .useDictionaryFilter(true) + .useStatsFilter(true) + .useRecordFilter(true) + .useColumnIndexFilter(true) + .build(); + + List fromNumber = new ArrayList<>(); + List toNumber = new ArrayList<>(); + int blockAmount; + + try (ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(super.fsPath, configuration), options)) { + blockAmount = reader.getRowGroups().size(); + PageReadStore pages; + while ((pages = reader.readNextRowGroup()) != null) { + MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(super.schema); + RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(super.schema)); + long rowCount = pages.getRowCount(); + long from = recordReader.read().getLong("i64", 0); + for (int i = 1; i < rowCount - 1; i++) { + recordReader.read(); + } + Group group = recordReader.read(); + long to; + if (group == null) { + to = from; + } else { + to = group.getLong("i64", 0); + } + fromNumber.add(from); + toNumber.add(to); + } + } + + // Randomize indexes + List indexes = new ArrayList<>(); + for (int j = 0; j < 4; j++) { + for (int i = 0; i < blockAmount; i++) { + indexes.add(i); + } + indexes.add(-1); + indexes.add(blockAmount); + indexes.add(blockAmount + 1); + } + + Collections.shuffle(indexes, random); + + try (ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(super.fsPath, configuration), options)) { + test(reader, indexes, fromNumber, toNumber, blockAmount); + } + + try (ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(super.fsPath, configuration), filterOptions)) { + testFiltered(reader, indexes, fromNumber, toNumber, blockAmount); + } + } + + public void assertValues(PageReadStore pages, List fromNumber, List toNumber, int index, int blockAmount) { + if (index < 0 || index >= blockAmount) { + assertNull(pages); + return; + } + + long firstValue = fromNumber.get(index); + long lastValue = toNumber.get(index); + + MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(super.schema); + RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(super.schema)); + for (long i = firstValue; i <= lastValue; i++) { + Group group = recordReader.read(); + assertEquals(i, group.getLong("i64", 0)); + assertEquals((i % 2) == 0 ? 1 : 0, group.getLong("i64_flip", 0)); + } + boolean exceptionThrown = false; + try { + recordReader.read(); + } catch (ParquetDecodingException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + public void assertFilteredValues(PageReadStore pages, List fromNumber, List toNumber, int index, int blockAmount) { + if (index < 0 || index >= blockAmount) { + assertNull(pages); + return; + } + + long firstValue = fromNumber.get(index); + long lastValue = toNumber.get(index); + + MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(super.schema); + RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(super.schema), filter); + + for (long i = firstValue; i <= lastValue; i++) { + Group group = recordReader.read(); + if ((i % 2) == 0) { + assertEquals(i, group.getLong("i64", 0)); + assertEquals(1, group.getLong("i64_flip", 0)); + } else { + assertTrue(group == null || recordReader.shouldSkipCurrentRecord()); + } + } + + boolean exceptionThrown = false; + try { + recordReader.read(); + } catch (ParquetDecodingException e) { + exceptionThrown = true; + } + assertTrue(exceptionThrown); + } + + protected abstract void test(ParquetFileReader reader, List indexes, List fromNumber, List toNumber, int blockAmount) throws IOException; + protected abstract void testFiltered(ParquetFileReader reader, List indexes, List fromNumber, List toNumber, int blockAmount) throws IOException; + } + + public static class DataContextRandom extends DataContext { + + public DataContextRandom(long seed, File path, int blockSize, int pageSize, boolean enableDictionary, ParquetProperties.WriterVersion version) throws IOException { + super(seed, path, blockSize, pageSize, enableDictionary, version); + } + + @Override + protected void test(ParquetFileReader reader, List indexes, List fromNumber, List toNumber, int blockAmount) throws IOException { + for (int index: indexes) { + PageReadStore pages = reader.readRowGroup(index); + assertValues(pages, fromNumber, toNumber, index, blockAmount); + } + } + + @Override + protected void testFiltered(ParquetFileReader reader, List indexes, List fromNumber, List toNumber, int blockAmount) throws IOException { + for (int index: indexes) { + PageReadStore pages = reader.readFilteredRowGroup(index); + assertFilteredValues(pages, fromNumber, toNumber, index, blockAmount); + } + } + } + + public static class DataContextRandomAndSequential extends DataContext { + + public DataContextRandomAndSequential(long seed, File path, int blockSize, int pageSize, boolean enableDictionary, ParquetProperties.WriterVersion version) throws IOException { + super(seed, path, blockSize, pageSize, enableDictionary, version); + } + + @Override + protected void test(ParquetFileReader reader, List indexes, List fromNumber, List toNumber, int blockAmount) throws IOException { + int splitPoint = indexes.size()/2; + + { + PageReadStore pages = reader.readNextRowGroup(); + assertValues(pages, fromNumber, toNumber, 0 , blockAmount); + } + for (int i = 0; i < splitPoint; i++) { + int index = indexes.get(i); + PageReadStore pages = reader.readRowGroup(index); + assertValues(pages, fromNumber, toNumber, index, blockAmount); + } + { + PageReadStore pages = reader.readNextRowGroup(); + assertValues(pages, fromNumber, toNumber, 1 , blockAmount); + } + for (int i = splitPoint; i < indexes.size(); i++) { + int index = indexes.get(i); + PageReadStore pages = reader.readRowGroup(index); + assertValues(pages, fromNumber, toNumber, index, blockAmount); + } + { + PageReadStore pages = reader.readNextRowGroup(); + assertValues(pages, fromNumber, toNumber, 2 , blockAmount); + } + } + + @Override + protected void testFiltered(ParquetFileReader reader, List indexes, List fromNumber, List toNumber, int blockAmount) throws IOException { + int splitPoint = indexes.size()/2; + + { + PageReadStore pages = reader.readNextFilteredRowGroup(); + assertFilteredValues(pages, fromNumber, toNumber, 0, blockAmount); + } + for (int i = 0; i < splitPoint; i++) { + int index = indexes.get(i); + PageReadStore pages = reader.readFilteredRowGroup(index); + assertFilteredValues(pages, fromNumber, toNumber, index, blockAmount); + } + { + PageReadStore pages = reader.readNextFilteredRowGroup(); + assertFilteredValues(pages, fromNumber, toNumber, 1, blockAmount); + } + for (int i = splitPoint; i < indexes.size(); i++) { + int index = indexes.get(i); + PageReadStore pages = reader.readFilteredRowGroup(index); + assertFilteredValues(pages, fromNumber, toNumber, index, blockAmount); + } + { + PageReadStore pages = reader.readNextFilteredRowGroup(); + assertFilteredValues(pages, fromNumber, toNumber, 2, blockAmount); + } + } + } + +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/DataGenerationContext.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/DataGenerationContext.java new file mode 100644 index 0000000000..c3c61c49b6 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/DataGenerationContext.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.statistics; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.example.GroupWriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.schema.MessageType; + +import java.io.File; +import java.io.IOException; + +public class DataGenerationContext { + public static abstract class WriteContext { + protected final File path; + protected final Path fsPath; + protected final MessageType schema; + protected final int blockSize; + protected final int pageSize; + protected final boolean enableDictionary; + protected final boolean enableValidation; + protected final ParquetProperties.WriterVersion version; + + public WriteContext(File path, MessageType schema, int blockSize, int pageSize, boolean enableDictionary, boolean enableValidation, ParquetProperties.WriterVersion version) throws IOException { + this.path = path; + this.fsPath = new Path(path.toString()); + this.schema = schema; + this.blockSize = blockSize; + this.pageSize = pageSize; + this.enableDictionary = enableDictionary; + this.enableValidation = enableValidation; + this.version = version; + } + + public abstract void write(ParquetWriter writer) throws IOException; + + public abstract void test() throws IOException; + } + + public static void writeAndTest(WriteContext context) throws IOException { + // Create the configuration, and then apply the schema to our configuration. + Configuration configuration = new Configuration(); + GroupWriteSupport.setSchema(context.schema, configuration); + GroupWriteSupport groupWriteSupport = new GroupWriteSupport(); + + // Create the writer properties + final int blockSize = context.blockSize; + final int pageSize = context.pageSize; + final int dictionaryPageSize = pageSize; + final boolean enableDictionary = context.enableDictionary; + final boolean enableValidation = context.enableValidation; + ParquetProperties.WriterVersion writerVersion = context.version; + CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED; + + try (ParquetWriter writer = new ParquetWriter(context.fsPath, + groupWriteSupport, codec, blockSize, pageSize, dictionaryPageSize, + enableDictionary, enableValidation, writerVersion, configuration)) { + context.write(writer); + } + + context.test(); + + context.path.delete(); + } +} diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java index 191e397c14..9545aefc7d 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/RandomValues.java @@ -33,7 +33,7 @@ public class RandomValues { private static final String ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz1234567890"; - static abstract class RandomValueGenerator> implements Supplier { + public static abstract class RandomValueGenerator> implements Supplier { private final Random random; protected RandomValueGenerator(long seed) { @@ -94,7 +94,7 @@ public T get() { } } - static abstract class RandomBinaryBase> extends RandomValueGenerator { + public static abstract class RandomBinaryBase> extends RandomValueGenerator { protected final int bufferLength; protected final byte[] buffer; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestStatistics.java b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestStatistics.java index 91e1735446..bd4ae4ea28 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestStatistics.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/statistics/TestStatistics.java @@ -20,7 +20,6 @@ package org.apache.parquet.statistics; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.impl.ColumnReaderImpl; @@ -36,8 +35,6 @@ import org.apache.parquet.format.converter.ParquetMetadataConverter; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetWriter; -import org.apache.parquet.hadoop.example.GroupWriteSupport; -import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.PrimitiveConverter; @@ -72,59 +69,6 @@ public class TestStatistics { private static final int MEGABYTE = 1 << 20; private static final long RANDOM_SEED = 1441990701846L; //System.currentTimeMillis(); - public static class DataGenerationContext { - public static abstract class WriteContext { - protected final File path; - protected final Path fsPath; - protected final MessageType schema; - protected final int blockSize; - protected final int pageSize; - protected final boolean enableDictionary; - protected final boolean enableValidation; - protected final ParquetProperties.WriterVersion version; - - public WriteContext(File path, MessageType schema, int blockSize, int pageSize, boolean enableDictionary, boolean enableValidation, ParquetProperties.WriterVersion version) throws IOException { - this.path = path; - this.fsPath = new Path(path.toString()); - this.schema = schema; - this.blockSize = blockSize; - this.pageSize = pageSize; - this.enableDictionary = enableDictionary; - this.enableValidation = enableValidation; - this.version = version; - } - - public abstract void write(ParquetWriter writer) throws IOException; - public abstract void test() throws IOException; - } - - public static void writeAndTest(WriteContext context) throws IOException { - // Create the configuration, and then apply the schema to our configuration. - Configuration configuration = new Configuration(); - GroupWriteSupport.setSchema(context.schema, configuration); - GroupWriteSupport groupWriteSupport = new GroupWriteSupport(); - - // Create the writer properties - final int blockSize = context.blockSize; - final int pageSize = context.pageSize; - final int dictionaryPageSize = pageSize; - final boolean enableDictionary = context.enableDictionary; - final boolean enableValidation = context.enableValidation; - ParquetProperties.WriterVersion writerVersion = context.version; - CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED; - - try (ParquetWriter writer = new ParquetWriter(context.fsPath, - groupWriteSupport, codec, blockSize, pageSize, dictionaryPageSize, - enableDictionary, enableValidation, writerVersion, configuration)) { - context.write(writer); - } - - context.test(); - - context.path.delete(); - } - } - public static class SingletonPageReader implements PageReader { private final DictionaryPage dict; private final DataPage data;