diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java index 753bda8907..796cf17ff4 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java @@ -43,6 +43,14 @@ public interface PageReadStore { */ long getRowCount(); + /** + * @return the optional of the long representing the row index offset of this row group or an empty optional if the + * related data is not available + */ + default Optional getRowIndexOffset() { + return Optional.empty(); + } + /** * Returns the indexes of the rows to be read/built if the related data is available. All the rows which index is not * returned shall be skipped. diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 96980a4545..0ea75f3d86 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -1400,6 +1400,31 @@ public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilte return readParquetMetadata(from, filter, null, false, 0); } + private Map generateRowGroupOffsets(FileMetaData metaData) { + Map rowGroupOrdinalToRowIdx = new HashMap<>(); + List rowGroups = metaData.getRow_groups(); + if (rowGroups != null) { + long rowIdxSum = 0; + for (int i = 0; i < rowGroups.size(); i++) { + rowGroupOrdinalToRowIdx.put(rowGroups.get(i), rowIdxSum); + rowIdxSum += rowGroups.get(i).getNum_rows(); + } + } + return rowGroupOrdinalToRowIdx; + } + + /** + * A container for [[FileMetaData]] and [[RowGroup]] to ROW_INDEX offset map. + */ + private class FileMetaDataAndRowGroupOffsetInfo { + final FileMetaData fileMetadata; + final Map rowGroupToRowIndexOffsetMap; + public FileMetaDataAndRowGroupOffsetInfo(FileMetaData fileMetadata, Map rowGroupToRowIndexOffsetMap) { + this.fileMetadata = fileMetadata; + this.rowGroupToRowIndexOffsetMap = rowGroupToRowIndexOffsetMap; + } + } + public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter, final InternalFileDecryptor fileDecryptor, final boolean encryptedFooter, final int combinedFooterLength) throws IOException { @@ -1407,27 +1432,35 @@ public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilte final BlockCipher.Decryptor footerDecryptor = (encryptedFooter? fileDecryptor.fetchFooterDecryptor() : null); final byte[] encryptedFooterAAD = (encryptedFooter? AesCipher.createFooterAAD(fileDecryptor.getFileAAD()) : null); - FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor() { + FileMetaDataAndRowGroupOffsetInfo fileMetaDataAndRowGroupInfo = filter.accept(new MetadataFilterVisitor() { @Override - public FileMetaData visit(NoFilter filter) throws IOException { - return readFileMetaData(from, footerDecryptor, encryptedFooterAAD); + public FileMetaDataAndRowGroupOffsetInfo visit(NoFilter filter) throws IOException { + FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD); + return new FileMetaDataAndRowGroupOffsetInfo(fileMetadata, generateRowGroupOffsets(fileMetadata)); } @Override - public FileMetaData visit(SkipMetadataFilter filter) throws IOException { - return readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD); + public FileMetaDataAndRowGroupOffsetInfo visit(SkipMetadataFilter filter) throws IOException { + FileMetaData fileMetadata = readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD); + return new FileMetaDataAndRowGroupOffsetInfo(fileMetadata, generateRowGroupOffsets(fileMetadata)); } @Override - public FileMetaData visit(OffsetMetadataFilter filter) throws IOException { - return filterFileMetaDataByStart(readFileMetaData(from, footerDecryptor, encryptedFooterAAD), filter); + public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) throws IOException { + FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD); + FileMetaData filteredFileMetadata = filterFileMetaDataByStart(fileMetadata, filter); + return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, generateRowGroupOffsets(fileMetadata)); } @Override - public FileMetaData visit(RangeMetadataFilter filter) throws IOException { - return filterFileMetaDataByMidpoint(readFileMetaData(from, footerDecryptor, encryptedFooterAAD), filter); + public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throws IOException { + FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD); + FileMetaData filteredFileMetadata = filterFileMetaDataByMidpoint(fileMetadata, filter); + return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, generateRowGroupOffsets(fileMetadata)); } }); + FileMetaData fileMetaData = fileMetaDataAndRowGroupInfo.fileMetadata; + Map rowGroupToRowIndexOffsetMap = fileMetaDataAndRowGroupInfo.rowGroupToRowIndexOffsetMap; LOG.debug("{}", fileMetaData); if (!encryptedFooter && null != fileDecryptor) { @@ -1447,7 +1480,7 @@ public FileMetaData visit(RangeMetadataFilter filter) throws IOException { } } - ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData, fileDecryptor, encryptedFooter); + ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData, fileDecryptor, encryptedFooter, rowGroupToRowIndexOffsetMap); if (LOG.isDebugEnabled()) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata)); return parquetMetadata; } @@ -1476,6 +1509,13 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata, InternalFileDecryptor fileDecryptor, boolean encryptedFooter) throws IOException { + return fromParquetMetadata(parquetMetadata, fileDecryptor, encryptedFooter, new HashMap()); + } + + public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata, + InternalFileDecryptor fileDecryptor, + boolean encryptedFooter, + Map rowGroupToRowIndexOffsetMap) throws IOException { MessageType messageType = fromParquetSchema(parquetMetadata.getSchema(), parquetMetadata.getColumn_orders()); List blocks = new ArrayList(); List row_groups = parquetMetadata.getRow_groups(); @@ -1485,6 +1525,9 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata, BlockMetaData blockMetaData = new BlockMetaData(); blockMetaData.setRowCount(rowGroup.getNum_rows()); blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size()); + if (rowGroupToRowIndexOffsetMap.containsKey(rowGroup)) { + blockMetaData.setRowIndexOffset(rowGroupToRowIndexOffsetMap.get(rowGroup)); + } // not set in legacy files if (rowGroup.isSetOrdinal()) { blockMetaData.setOrdinal(rowGroup.getOrdinal()); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java index 3d1bafe0a5..85ba98cdbb 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java @@ -248,15 +248,26 @@ public DictionaryPage readDictionaryPage() { private final Map readers = new HashMap(); private final long rowCount; + private final long rowIndexOffset; private final RowRanges rowRanges; public ColumnChunkPageReadStore(long rowCount) { + this(rowCount, -1); + } + + ColumnChunkPageReadStore(RowRanges rowRanges) { + this(rowRanges, -1); + } + + ColumnChunkPageReadStore(long rowCount, long rowIndexOffset) { this.rowCount = rowCount; + this.rowIndexOffset = rowIndexOffset; rowRanges = null; } - ColumnChunkPageReadStore(RowRanges rowRanges) { + ColumnChunkPageReadStore(RowRanges rowRanges, long rowIndexOffset) { this.rowRanges = rowRanges; + this.rowIndexOffset = rowIndexOffset; rowCount = rowRanges.rowCount(); } @@ -265,6 +276,11 @@ public long getRowCount() { return rowCount; } + @Override + public Optional getRowIndexOffset() { + return rowIndexOffset < 0 ? Optional.empty() : Optional.of(rowIndexOffset); + } + @Override public PageReader getPageReader(ColumnDescriptor path) { final PageReader pageReader = readers.get(path); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java index 8ffe19f2b8..8203e9098d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java @@ -22,8 +22,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.Objects; +import java.util.Optional; +import java.util.PrimitiveIterator; import java.util.Set; +import java.util.stream.LongStream; import org.apache.hadoop.conf.Configuration; @@ -69,6 +71,8 @@ class InternalParquetRecordReader { private long current = 0; private int currentBlock = -1; private ParquetFileReader reader; + private long currentRowIdx = -1; + private PrimitiveIterator.OfLong rowIdxInFileItr; private org.apache.parquet.io.RecordReader recordReader; private boolean strictTypeChecking; @@ -127,6 +131,7 @@ private void checkRead() throws IOException { if (pages == null) { throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total); } + resetRowIndexIterator(pages); long timeSpentReading = System.currentTimeMillis() - t0; totalTimeSpentReadingBytes += timeSpentReading; BenchmarkCounter.incrementTime(timeSpentReading); @@ -227,6 +232,11 @@ public boolean nextKeyValue() throws IOException, InterruptedException { try { currentValue = recordReader.read(); + if (rowIdxInFileItr != null && rowIdxInFileItr.hasNext()) { + currentRowIdx = rowIdxInFileItr.next(); + } else { + currentRowIdx = -1; + } } catch (RecordMaterializationException e) { // this might throw, but it's fatal if it does. unmaterializableRecordCounter.incErrors(e); @@ -265,4 +275,47 @@ private static Map> toSetMultiMap(Map map) { return Collections.unmodifiableMap(setMultiMap); } + /** + * Returns the row index of the current row. If no row has been processed or if the + * row index information is unavailable from the underlying @{@link PageReadStore}, returns -1. + */ + public long getCurrentRowIndex() { + if (current == 0L || rowIdxInFileItr == null) { + return -1; + } + return currentRowIdx; + } + + /** + * Resets the row index iterator based on the current processed row group. + */ + private void resetRowIndexIterator(PageReadStore pages) { + Optional rowGroupRowIdxOffset = pages.getRowIndexOffset(); + if (!rowGroupRowIdxOffset.isPresent()) { + this.rowIdxInFileItr = null; + return; + } + + currentRowIdx = -1; + final PrimitiveIterator.OfLong rowIdxInRowGroupItr; + if (pages.getRowIndexes().isPresent()) { + rowIdxInRowGroupItr = pages.getRowIndexes().get(); + } else { + rowIdxInRowGroupItr = LongStream.range(0, pages.getRowCount()).iterator(); + } + // Adjust the row group offset in the `rowIndexWithinRowGroupIterator` iterator. + this.rowIdxInFileItr = new PrimitiveIterator.OfLong() { + public long nextLong() { + return rowGroupRowIdxOffset.get() + rowIdxInRowGroupItr.nextLong(); + } + + public boolean hasNext() { + return rowIdxInRowGroupItr.hasNext(); + } + + public Long next() { + return rowGroupRowIdxOffset.get() + rowIdxInRowGroupItr.next(); + } + }; + } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 63a22d1321..97fe86d191 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -929,7 +929,7 @@ private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOE if (block.getRowCount() == 0) { throw new RuntimeException("Illegal row group of 0 rows"); } - ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(block.getRowCount()); + ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(block.getRowCount(), block.getRowIndexOffset()); // prepare the list of consecutive parts to read them in one scan List allParts = new ArrayList(); ConsecutivePartList currentParts = null; @@ -1044,7 +1044,7 @@ public PageReadStore readNextFilteredRowGroup() throws IOException { } private ColumnChunkPageReadStore internalReadFilteredRowGroup(BlockMetaData block, RowRanges rowRanges, ColumnIndexStore ciStore) throws IOException { - ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(rowRanges); + ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(rowRanges, block.getRowIndexOffset()); // prepare the list of consecutive parts to read them in one scan ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount()); List allParts = new ArrayList<>(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java index c215f5edf0..6d7672365b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java @@ -140,6 +140,16 @@ public T read() throws IOException { } } + /** + * @return the row index of the last read row. If no row has been processed, returns -1. + */ + public long getCurrentRowIndex() { + if (reader == null) { + return -1; + } + return reader.getCurrentRowIndex(); + } + private void initReader() throws IOException { if (reader != null) { reader.close(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java index 46534107a6..e46ccdd156 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java @@ -207,6 +207,13 @@ public boolean nextKeyValue() throws IOException, InterruptedException { return internalReader.nextKeyValue(); } + /** + * @return the row index of the current row. If no row has been processed, returns -1. + */ + public long getCurrentRowIndex() throws IOException { + return internalReader.getCurrentRowIndex(); + } + private ParquetInputSplit toParquetSplit(InputSplit split) throws IOException { if (split instanceof ParquetInputSplit) { return (ParquetInputSplit) split; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java index ce204dc32e..4f9fd14f51 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java @@ -33,6 +33,7 @@ public class BlockMetaData { private long totalByteSize; private String path; private int ordinal; + private long rowIndexOffset = -1; public BlockMetaData() { } @@ -65,6 +66,18 @@ public void setRowCount(long rowCount) { this.rowCount = rowCount; } + /** + * @return -1 if the rowIndexOffset for the {@link BlockMetaData} is unavailable else returns the actual rowIndexOffset + */ + public long getRowIndexOffset() { return rowIndexOffset; } + + /** + * @param rowIndexOffset the rowIndexOffset to set + */ + public void setRowIndexOffset(long rowIndexOffset) { + this.rowIndexOffset = rowIndexOffset; + } + /** * @return the totalByteSize */ @@ -105,7 +118,11 @@ public long getStartingPos() { @Override public String toString() { - return "BlockMetaData{" + rowCount + ", " + totalByteSize + " " + columns + "}"; + String rowIndexOffsetStr = ""; + if (rowIndexOffset != -1) { + rowIndexOffsetStr = ", rowIndexOffset = " + rowIndexOffset; + } + return "BlockMetaData{" + rowCount + ", " + totalByteSize + rowIndexOffsetStr + " " + columns + "}"; } /** diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java index 6355f35c3c..1e74353e2c 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java @@ -18,6 +18,8 @@ */ package org.apache.parquet.filter2.recordlevel; +import static org.junit.Assert.assertEquals; + import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -315,7 +317,7 @@ public static void write(ParquetWriter.Builder builder, List use } } - private static ParquetReader createReader(Path file, Filter filter) throws IOException { + public static ParquetReader createReader(Path file, Filter filter) throws IOException { Configuration conf = new Configuration(); GroupWriteSupport.setSchema(schema, conf); @@ -341,11 +343,24 @@ public static List readFile(File f, Filter filter) throws IOException { } public static List readUsers(ParquetReader.Builder builder) throws IOException { + return readUsers(builder, false); + } + + /** + * Returns a list of users from the underlying [[ParquetReader]] builder. + * If `validateRowIndexes` is set to true, this method will also validate the ROW_INDEXes for the + * rows read from ParquetReader - ROW_INDEX for a row should be same as underlying user id. + */ + public static List readUsers(ParquetReader.Builder builder, boolean validateRowIndexes) throws IOException { ParquetReader reader = builder.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString()).build(); List users = new ArrayList<>(); for (Group group = reader.read(); group != null; group = reader.read()) { - users.add(userFromGroup(group)); + User u = userFromGroup(group); + users.add(u); + if (validateRowIndexes) { + assertEquals(reader.getCurrentRowIndex(), u.id); + } } return users; } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java index b07fccddde..68a4e34e3d 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java @@ -200,7 +200,7 @@ private List readUsers(FilterPredicate filter, boolean use .useStatsFilter(useOtherFiltering) .useRecordFilter(useOtherFiltering) .useBloomFilter(useBloomFilter) - .useColumnIndexFilter(useOtherFiltering)); + .useColumnIndexFilter(useOtherFiltering), true); } // Assumes that both lists are in the same order diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java index 5e181059f0..0678cbfef0 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java @@ -248,7 +248,7 @@ private List readUsers(Filter filter, boolean useOtherFiltering, boolean u .useDictionaryFilter(useOtherFiltering) .useStatsFilter(useOtherFiltering) .useRecordFilter(useOtherFiltering) - .useColumnIndexFilter(useColumnIndexFilter)); + .useColumnIndexFilter(useColumnIndexFilter), true); } private List readUsersWithProjection(Filter filter, MessageType schema, boolean useOtherFiltering, @@ -261,7 +261,7 @@ private List readUsersWithProjection(Filter filter, MessageType schema, bo .useStatsFilter(useOtherFiltering) .useRecordFilter(useOtherFiltering) .useColumnIndexFilter(useColumnIndexFilter) - .set(ReadSupport.PARQUET_READ_SCHEMA, schema.toString())); + .set(ReadSupport.PARQUET_READ_SCHEMA, schema.toString()), true); } private FileDecryptionProperties getFileDecryptionProperties() { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java new file mode 100644 index 0000000000..86f14a8628 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import static org.apache.parquet.filter2.predicate.FilterApi.in; +import static org.apache.parquet.filter2.predicate.FilterApi.longColumn; +import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.nio.file.Files; +import java.util.*; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.recordlevel.PhoneBookWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestParquetReader { + + private static final Path FILE_V1 = createTempFile(); + private static final Path FILE_V2 = createTempFile(); + private static final Path STATIC_FILE_WITHOUT_COL_INDEXES = createPathFromCP("/test-file-with-no-column-indexes-1.parquet"); + private static final List DATA = Collections.unmodifiableList(makeUsers(1000)); + + private final Path file; + + private static Path createPathFromCP(String path) { + try { + return new Path(TestParquetReader.class.getResource(path).toURI()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + + public TestParquetReader(Path file) { + this.file = file; + } + + @Parameterized.Parameters + public static Collection data() { + Object[][] data = new Object[][] { + { FILE_V1 }, + { FILE_V2 }, + { STATIC_FILE_WITHOUT_COL_INDEXES } }; + return Arrays.asList(data); + } + + @BeforeClass + public static void createFiles() throws IOException { + writePhoneBookToFile(FILE_V1, ParquetProperties.WriterVersion.PARQUET_1_0); + writePhoneBookToFile(FILE_V2, ParquetProperties.WriterVersion.PARQUET_2_0); + } + + @AfterClass + public static void deleteFiles() throws IOException { + deleteFile(FILE_V1); + deleteFile(FILE_V2); + } + + private static void deleteFile(Path file) throws IOException { + file.getFileSystem(new Configuration()).delete(file, false); + } + + public static List makeUsers(int rowCount) { + List users = new ArrayList<>(); + for (int i = 0; i < rowCount; i++) { + PhoneBookWriter.Location location = null; + if (i % 3 == 1) { + location = new PhoneBookWriter.Location((double)i, (double) i * 2); + } + if (i % 3 == 2) { + location = new PhoneBookWriter.Location((double)i, null); + } + // row index of each row in the file is same as the user id. + users.add(new PhoneBookWriter.User(i, "p" + i, Arrays.asList(new PhoneBookWriter.PhoneNumber(i, "cell")), location)); + } + return users; + } + + private static Path createTempFile() { + try { + return new Path(Files.createTempFile("test-ci_", ".parquet").toAbsolutePath().toString()); + } catch (IOException e) { + throw new AssertionError("Unable to create temporary file", e); + } + } + + private static void writePhoneBookToFile(Path file, ParquetProperties.WriterVersion parquetVersion) throws IOException { + int pageSize = DATA.size() / 10; // Ensure that several pages will be created + int rowGroupSize = pageSize * 6 * 5; // Ensure that there are more row-groups created + + PhoneBookWriter.write(ExampleParquetWriter.builder(file) + .withWriteMode(OVERWRITE) + .withRowGroupSize(rowGroupSize) + .withPageSize(pageSize) + .withWriterVersion(parquetVersion), + DATA); + } + + private List readUsers(FilterCompat.Filter filter, boolean useOtherFiltering, boolean useColumnIndexFilter) + throws IOException { + return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file) + .withFilter(filter) + .useDictionaryFilter(useOtherFiltering) + .useStatsFilter(useOtherFiltering) + .useRecordFilter(useOtherFiltering) + .useColumnIndexFilter(useColumnIndexFilter), true); + } + + @Test + public void testCurrentRowIndex() throws Exception { + ParquetReader reader = PhoneBookWriter.createReader(file, FilterCompat.NOOP); + // Fetch row index without processing any row. + assertEquals(reader.getCurrentRowIndex(), -1); + reader.read(); + assertEquals(reader.getCurrentRowIndex(), 0); + // calling the same API again and again should return same result. + assertEquals(reader.getCurrentRowIndex(), 0); + + reader.read(); + assertEquals(reader.getCurrentRowIndex(), 1); + assertEquals(reader.getCurrentRowIndex(), 1); + long expectedCurrentRowIndex = 2L; + while(reader.read() != null) { + assertEquals(reader.getCurrentRowIndex(), expectedCurrentRowIndex); + expectedCurrentRowIndex++; + } + // reader.read() returned null and so reader doesn't have any more rows. + assertEquals(reader.getCurrentRowIndex(), -1); + } + + @Test + public void testSimpleFiltering() throws Exception { + Set idSet = new HashSet<>(); + idSet.add(123l); + idSet.add(567l); + // The readUsers also validates the rowIndex for each returned row. + List filteredUsers1 = readUsers(FilterCompat.get(in(longColumn("id"), idSet)), true, true); + assertEquals(filteredUsers1.size(), 2L); + List filteredUsers2 = readUsers(FilterCompat.get(in(longColumn("id"), idSet)), true, false); + assertEquals(filteredUsers2.size(), 2L); + List filteredUsers3 = readUsers(FilterCompat.get(in(longColumn("id"), idSet)), false, false); + assertEquals(filteredUsers3.size(), 1000L); + } + + @Test + public void testNoFiltering() throws Exception { + assertEquals(DATA, readUsers(FilterCompat.NOOP, false, false)); + assertEquals(DATA, readUsers(FilterCompat.NOOP, true, false)); + assertEquals(DATA, readUsers(FilterCompat.NOOP, false, true)); + assertEquals(DATA, readUsers(FilterCompat.NOOP, true, true)); + } +} diff --git a/parquet-hadoop/src/test/resources/test-file-with-no-column-indexes-1.parquet b/parquet-hadoop/src/test/resources/test-file-with-no-column-indexes-1.parquet new file mode 100644 index 0000000000..722e687ee6 Binary files /dev/null and b/parquet-hadoop/src/test/resources/test-file-with-no-column-indexes-1.parquet differ