From a234145faf8bfc97e957f8b2049a176139018f96 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Fri, 11 Feb 2022 09:46:42 -0800 Subject: [PATCH 1/7] PARQUET-2117: Changes to generate row index in InternalParquetRecordReader, also expose the row index via ParquetReader or ParquetRecordReader - Add and populate rowIndexOffset field in BlockMetaData - Changes to generate row index in InternalParquetRecordReader, also expose the row index via ParquetReader or ParquetRecordReader - Add new unit tests and extend all the ColumnIndexFiltering and BloomFiltering unit tests to validate row indexes also. --- .../parquet/column/page/PageReadStore.java | 8 + .../converter/ParquetMetadataConverter.java | 61 +++++- .../hadoop/ColumnChunkPageReadStore.java | 12 +- .../hadoop/InternalParquetRecordReader.java | 57 +++++- .../parquet/hadoop/ParquetFileReader.java | 4 +- .../apache/parquet/hadoop/ParquetReader.java | 10 + .../parquet/hadoop/ParquetRecordReader.java | 7 + ...xFetchedWithoutProcessingRowException.java | 30 +++ .../hadoop/RowIndexNotSupportedException.java | 31 ++++ .../hadoop/metadata/BlockMetaData.java | 14 ++ .../filter2/recordlevel/PhoneBookWriter.java | 17 +- .../parquet/hadoop/TestBloomFiltering.java | 2 +- .../hadoop/TestColumnIndexFiltering.java | 4 +- .../parquet/hadoop/TestParquetReader.java | 173 ++++++++++++++++++ 14 files changed, 409 insertions(+), 21 deletions(-) create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowIndexFetchedWithoutProcessingRowException.java create mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowIndexNotSupportedException.java create mode 100644 parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java index 753bda8907..adc5c376df 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java @@ -43,6 +43,14 @@ public interface PageReadStore { */ long getRowCount(); + /** + * + * @return the row index offset of this row group. + */ + default Optional getRowIndexOffset() { + return Optional.empty(); + } + /** * Returns the indexes of the rows to be read/built if the related data is available. All the rows which index is not * returned shall be skipped. diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 96980a4545..fac0698f74 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -1400,6 +1400,31 @@ public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilte return readParquetMetadata(from, filter, null, false, 0); } + private Map generateRowGroupOffsets(FileMetaData metaData) { + Map rowGroupOrdinalToRowIdx = new HashMap<>(); + List rowGroups = metaData.getRow_groups(); + if (rowGroups != null) { + long rowIdxSum = 0; + for (int i = 0; i < rowGroups.size(); i++) { + rowGroupOrdinalToRowIdx.put(rowGroups.get(i), rowIdxSum); + rowIdxSum += rowGroups.get(i).getNum_rows(); + } + } + return rowGroupOrdinalToRowIdx; + } + + /** + * A container for [[FileMetaData]] and [[RowGroup]] to ROW_INDEX offset map. + */ + private class FileMetaDataAndRowGroupOffsetInfo { + FileMetaData fileMetadata; + Map rowGroupToRowIndexOffsetMap; + public FileMetaDataAndRowGroupOffsetInfo(FileMetaData fileMetadata, Map rowGroupToRowIndexOffsetMap) { + this.fileMetadata = fileMetadata; + this.rowGroupToRowIndexOffsetMap = rowGroupToRowIndexOffsetMap; + } + } + public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilter filter, final InternalFileDecryptor fileDecryptor, final boolean encryptedFooter, final int combinedFooterLength) throws IOException { @@ -1407,27 +1432,35 @@ public ParquetMetadata readParquetMetadata(final InputStream from, MetadataFilte final BlockCipher.Decryptor footerDecryptor = (encryptedFooter? fileDecryptor.fetchFooterDecryptor() : null); final byte[] encryptedFooterAAD = (encryptedFooter? AesCipher.createFooterAAD(fileDecryptor.getFileAAD()) : null); - FileMetaData fileMetaData = filter.accept(new MetadataFilterVisitor() { + FileMetaDataAndRowGroupOffsetInfo fileMetaDataAndRowGroupInfo = filter.accept(new MetadataFilterVisitor() { @Override - public FileMetaData visit(NoFilter filter) throws IOException { - return readFileMetaData(from, footerDecryptor, encryptedFooterAAD); + public FileMetaDataAndRowGroupOffsetInfo visit(NoFilter filter) throws IOException { + FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD); + return new FileMetaDataAndRowGroupOffsetInfo(fileMetadata, generateRowGroupOffsets(fileMetadata)); } @Override - public FileMetaData visit(SkipMetadataFilter filter) throws IOException { - return readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD); + public FileMetaDataAndRowGroupOffsetInfo visit(SkipMetadataFilter filter) throws IOException { + FileMetaData fileMetadata = readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD); + return new FileMetaDataAndRowGroupOffsetInfo(fileMetadata, generateRowGroupOffsets(fileMetadata)); } @Override - public FileMetaData visit(OffsetMetadataFilter filter) throws IOException { - return filterFileMetaDataByStart(readFileMetaData(from, footerDecryptor, encryptedFooterAAD), filter); + public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) throws IOException { + FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD); + FileMetaData filteredFileMetadata = filterFileMetaDataByStart(fileMetadata, filter); + return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, generateRowGroupOffsets(fileMetadata)); } @Override - public FileMetaData visit(RangeMetadataFilter filter) throws IOException { - return filterFileMetaDataByMidpoint(readFileMetaData(from, footerDecryptor, encryptedFooterAAD), filter); + public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throws IOException { + FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD); + FileMetaData filteredFileMetadata = filterFileMetaDataByMidpoint(fileMetadata, filter); + return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, generateRowGroupOffsets(fileMetadata)); } }); + FileMetaData fileMetaData = fileMetaDataAndRowGroupInfo.fileMetadata; + Map rowGroupToRowIndexOffsetMap = fileMetaDataAndRowGroupInfo.rowGroupToRowIndexOffsetMap; LOG.debug("{}", fileMetaData); if (!encryptedFooter && null != fileDecryptor) { @@ -1447,7 +1480,7 @@ public FileMetaData visit(RangeMetadataFilter filter) throws IOException { } } - ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData, fileDecryptor, encryptedFooter); + ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData, fileDecryptor, encryptedFooter, rowGroupToRowIndexOffsetMap); if (LOG.isDebugEnabled()) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata)); return parquetMetadata; } @@ -1476,6 +1509,13 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata, InternalFileDecryptor fileDecryptor, boolean encryptedFooter) throws IOException { + return fromParquetMetadata(parquetMetadata, fileDecryptor, encryptedFooter, generateRowGroupOffsets(parquetMetadata)); + } + + public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata, + InternalFileDecryptor fileDecryptor, + boolean encryptedFooter, + Map rowGroupToRowIndexOffsetMap) throws IOException { MessageType messageType = fromParquetSchema(parquetMetadata.getSchema(), parquetMetadata.getColumn_orders()); List blocks = new ArrayList(); List row_groups = parquetMetadata.getRow_groups(); @@ -1485,6 +1525,7 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata, BlockMetaData blockMetaData = new BlockMetaData(); blockMetaData.setRowCount(rowGroup.getNum_rows()); blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size()); + blockMetaData.setRowIndexOffset(rowGroupToRowIndexOffsetMap.get(rowGroup)); // not set in legacy files if (rowGroup.isSetOrdinal()) { blockMetaData.setOrdinal(rowGroup.getOrdinal()); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java index 3d1bafe0a5..162b9cdf17 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java @@ -248,15 +248,18 @@ public DictionaryPage readDictionaryPage() { private final Map readers = new HashMap(); private final long rowCount; + private final long rowIndexOffset; private final RowRanges rowRanges; - public ColumnChunkPageReadStore(long rowCount) { + public ColumnChunkPageReadStore(long rowCount, long rowIndexOffset) { this.rowCount = rowCount; + this.rowIndexOffset = rowIndexOffset; rowRanges = null; } - ColumnChunkPageReadStore(RowRanges rowRanges) { + ColumnChunkPageReadStore(RowRanges rowRanges, long rowIndexOffset) { this.rowRanges = rowRanges; + this.rowIndexOffset = rowIndexOffset; rowCount = rowRanges.rowCount(); } @@ -265,6 +268,11 @@ public long getRowCount() { return rowCount; } + @Override + public Optional getRowIndexOffset() { + return Optional.of(rowIndexOffset); + } + @Override public PageReader getPageReader(ColumnDescriptor path) { final PageReader pageReader = readers.get(path); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java index 8ffe19f2b8..4acf6796f1 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java @@ -22,8 +22,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import java.util.Objects; +import java.util.Optional; +import java.util.PrimitiveIterator; import java.util.Set; +import java.util.stream.LongStream; import org.apache.hadoop.conf.Configuration; @@ -69,6 +71,8 @@ class InternalParquetRecordReader { private long current = 0; private int currentBlock = -1; private ParquetFileReader reader; + private long currentRowIndex = -1L; + private PrimitiveIterator.OfLong rowIndexWithinFileIterator; private org.apache.parquet.io.RecordReader recordReader; private boolean strictTypeChecking; @@ -127,6 +131,7 @@ private void checkRead() throws IOException { if (pages == null) { throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total); } + resetRowIndexIterator(pages); long timeSpentReading = System.currentTimeMillis() - t0; totalTimeSpentReadingBytes += timeSpentReading; BenchmarkCounter.incrementTime(timeSpentReading); @@ -227,6 +232,9 @@ public boolean nextKeyValue() throws IOException, InterruptedException { try { currentValue = recordReader.read(); + if (rowIndexWithinFileIterator != null) { + currentRowIndex = rowIndexWithinFileIterator.next(); + } } catch (RecordMaterializationException e) { // this might throw, but it's fatal if it does. unmaterializableRecordCounter.incErrors(e); @@ -265,4 +273,51 @@ private static Map> toSetMultiMap(Map map) { return Collections.unmodifiableMap(setMultiMap); } + /** + * Returns the ROW_INDEX of the current row. + */ + public long getCurrentRowIndex() { + if (current == 0L) { + throw new RowIndexFetchedWithoutProcessingRowException("row index can be fetched only after processing a row"); + } + if (rowIndexWithinFileIterator == null) { + throw new RowIndexNotSupportedException("underlying page read store implementation" + + " doesn't support row index generation"); + } + return currentRowIndex; + } + + /** + * Resets the row index iterator based on the current processed row group. + */ + private void resetRowIndexIterator(PageReadStore pages) { + Optional rowIndexOffsetForCurrentRowGroup = pages.getRowIndexOffset(); + currentRowIndex = -1L; + if (rowIndexOffsetForCurrentRowGroup.isPresent()) { + final PrimitiveIterator.OfLong rowIndexWithinRowGroupIterator; + if (pages.getRowIndexes().isPresent()) { + rowIndexWithinRowGroupIterator = pages.getRowIndexes().get(); + } else { + // If `pages.getRowIndexes()` is empty, this means column indexing has not triggered. + // So start generating row indexes for each row - starting from 0. + rowIndexWithinRowGroupIterator = LongStream.range(0, pages.getRowCount()).iterator(); + } + // Adjust the row group offset in the `rowIndexWithinRowGroupIterator` iterator. + this.rowIndexWithinFileIterator = new PrimitiveIterator.OfLong() { + public long nextLong() { + return rowIndexOffsetForCurrentRowGroup.get() + rowIndexWithinRowGroupIterator.nextLong(); + } + + public boolean hasNext() { + return rowIndexWithinRowGroupIterator.hasNext(); + } + + public Long next() { + return rowIndexOffsetForCurrentRowGroup.get() + rowIndexWithinRowGroupIterator.next(); + } + }; + } else { + this.rowIndexWithinFileIterator = null; + } + } } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 63a22d1321..97fe86d191 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -929,7 +929,7 @@ private ColumnChunkPageReadStore internalReadRowGroup(int blockIndex) throws IOE if (block.getRowCount() == 0) { throw new RuntimeException("Illegal row group of 0 rows"); } - ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(block.getRowCount()); + ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(block.getRowCount(), block.getRowIndexOffset()); // prepare the list of consecutive parts to read them in one scan List allParts = new ArrayList(); ConsecutivePartList currentParts = null; @@ -1044,7 +1044,7 @@ public PageReadStore readNextFilteredRowGroup() throws IOException { } private ColumnChunkPageReadStore internalReadFilteredRowGroup(BlockMetaData block, RowRanges rowRanges, ColumnIndexStore ciStore) throws IOException { - ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(rowRanges); + ColumnChunkPageReadStore rowGroup = new ColumnChunkPageReadStore(rowRanges, block.getRowIndexOffset()); // prepare the list of consecutive parts to read them in one scan ChunkListBuilder builder = new ChunkListBuilder(block.getRowCount()); List allParts = new ArrayList<>(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java index c215f5edf0..5d81801e32 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java @@ -140,6 +140,16 @@ public T read() throws IOException { } } + /** + * Returns the ROW_INDEX of the last read row. + */ + public long getCurrentRowIndex() { + if (reader == null) { + throw new RowIndexFetchedWithoutProcessingRowException("row index can be fetched only after processing a row"); + } + return reader.getCurrentRowIndex(); + } + private void initReader() throws IOException { if (reader != null) { reader.close(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java index 46534107a6..e50c23738d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java @@ -207,6 +207,13 @@ public boolean nextKeyValue() throws IOException, InterruptedException { return internalReader.nextKeyValue(); } + /** + * Returns the ROW_INDEX of the current row. + */ + public long getCurrentRowIndex() throws IOException { + return internalReader.getCurrentRowIndex(); + } + private ParquetInputSplit toParquetSplit(InputSplit split) throws IOException { if (split instanceof ParquetInputSplit) { return (ParquetInputSplit) split; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowIndexFetchedWithoutProcessingRowException.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowIndexFetchedWithoutProcessingRowException.java new file mode 100644 index 0000000000..aa4aa25ccf --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowIndexFetchedWithoutProcessingRowException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +/** + * Exception class to signify that row index information is unavailable. + */ +public class RowIndexFetchedWithoutProcessingRowException + extends IllegalStateException { + + public RowIndexFetchedWithoutProcessingRowException(String message) { + super(message); + } +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowIndexNotSupportedException.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowIndexNotSupportedException.java new file mode 100644 index 0000000000..f19828ab40 --- /dev/null +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowIndexNotSupportedException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +/** + * Exception class to signify that row index information is unavailable. + */ +public class RowIndexNotSupportedException + extends UnsupportedOperationException { + + public RowIndexNotSupportedException(String message) { + super(message); + } + +} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java index ce204dc32e..457d7a865b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java @@ -33,6 +33,7 @@ public class BlockMetaData { private long totalByteSize; private String path; private int ordinal; + private long rowIndexOffset; public BlockMetaData() { } @@ -65,6 +66,19 @@ public void setRowCount(long rowCount) { this.rowCount = rowCount; } + /** + * @return the rowIndexOffset + */ + public long getRowIndexOffset() { return rowIndexOffset; } + + /** + * @param rowIndexOffset the rowIndexOffset to set + */ + + public void setRowIndexOffset(long rowIndexOffset) { + this.rowIndexOffset = rowIndexOffset; + } + /** * @return the totalByteSize */ diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java index 6355f35c3c..a177a8a69c 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java @@ -18,6 +18,8 @@ */ package org.apache.parquet.filter2.recordlevel; +import static org.junit.Assert.assertEquals; + import java.io.File; import java.io.IOException; import java.util.ArrayList; @@ -315,7 +317,7 @@ public static void write(ParquetWriter.Builder builder, List use } } - private static ParquetReader createReader(Path file, Filter filter) throws IOException { + public static ParquetReader createReader(Path file, Filter filter) throws IOException { Configuration conf = new Configuration(); GroupWriteSupport.setSchema(schema, conf); @@ -340,12 +342,21 @@ public static List readFile(File f, Filter filter) throws IOException { return users; } - public static List readUsers(ParquetReader.Builder builder) throws IOException { + /** + * Returns a list of users from the underlying [[ParquetReader]] builder. + * If `validateRowIndexes` is set to true, this method will also validate the ROW_INDEXes for the + * rows read from ParquetReader - ROW_INDEX for a row should be same as underlying user id. + */ + public static List readUsers(ParquetReader.Builder builder, boolean validateRowIndexes) throws IOException { ParquetReader reader = builder.set(GroupWriteSupport.PARQUET_EXAMPLE_SCHEMA, schema.toString()).build(); List users = new ArrayList<>(); for (Group group = reader.read(); group != null; group = reader.read()) { - users.add(userFromGroup(group)); + User u = userFromGroup(group); + users.add(u); + if (validateRowIndexes) { + assertEquals(reader.getCurrentRowIndex(), u.id); + } } return users; } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java index b07fccddde..68a4e34e3d 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestBloomFiltering.java @@ -200,7 +200,7 @@ private List readUsers(FilterPredicate filter, boolean use .useStatsFilter(useOtherFiltering) .useRecordFilter(useOtherFiltering) .useBloomFilter(useBloomFilter) - .useColumnIndexFilter(useOtherFiltering)); + .useColumnIndexFilter(useOtherFiltering), true); } // Assumes that both lists are in the same order diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java index 5e181059f0..0678cbfef0 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnIndexFiltering.java @@ -248,7 +248,7 @@ private List readUsers(Filter filter, boolean useOtherFiltering, boolean u .useDictionaryFilter(useOtherFiltering) .useStatsFilter(useOtherFiltering) .useRecordFilter(useOtherFiltering) - .useColumnIndexFilter(useColumnIndexFilter)); + .useColumnIndexFilter(useColumnIndexFilter), true); } private List readUsersWithProjection(Filter filter, MessageType schema, boolean useOtherFiltering, @@ -261,7 +261,7 @@ private List readUsersWithProjection(Filter filter, MessageType schema, bo .useStatsFilter(useOtherFiltering) .useRecordFilter(useOtherFiltering) .useColumnIndexFilter(useColumnIndexFilter) - .set(ReadSupport.PARQUET_READ_SCHEMA, schema.toString())); + .set(ReadSupport.PARQUET_READ_SCHEMA, schema.toString()), true); } private FileDecryptionProperties getFileDecryptionProperties() { diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java new file mode 100644 index 0000000000..cedc2c7313 --- /dev/null +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.parquet.hadoop; + +import static org.apache.parquet.filter2.predicate.FilterApi.in; +import static org.apache.parquet.filter2.predicate.FilterApi.longColumn; +import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.nio.file.Files; +import java.util.*; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.example.data.Group; +import org.apache.parquet.filter2.compat.FilterCompat; +import org.apache.parquet.filter2.recordlevel.PhoneBookWriter; +import org.apache.parquet.hadoop.example.ExampleParquetWriter; +import org.apache.parquet.hadoop.example.GroupReadSupport; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class TestParquetReader { + + private static final Path FILE_V1 = createTempFile(); + private static final Path FILE_V2 = createTempFile(); + private static final List DATA = Collections.unmodifiableList(makeUsers(10000)); + + private final Path file; + + public TestParquetReader(Path file) { + this.file = file; + } + + @Parameterized.Parameters + public static Collection data() { + Object[][] data = new Object[][] { + { FILE_V1 }, + { FILE_V2 } }; + return Arrays.asList(data); + } + + @BeforeClass + public static void createFiles() throws IOException { + writePhoneBookToFile(FILE_V1, ParquetProperties.WriterVersion.PARQUET_1_0); + writePhoneBookToFile(FILE_V2, ParquetProperties.WriterVersion.PARQUET_2_0); + } + + @AfterClass + public static void deleteFiles() throws IOException { + deleteFile(FILE_V1); + deleteFile(FILE_V2); + } + + private static void deleteFile(Path file) throws IOException { + file.getFileSystem(new Configuration()).delete(file, false); + } + + public static List makeUsers(int rowCount) { + List users = new ArrayList<>(); + for (int i = 0; i < rowCount; i++) { + PhoneBookWriter.Location location = null; + if (i % 3 == 1) { + location = new PhoneBookWriter.Location((double)i, (double) i * 2); + } + if (i % 3 == 2) { + location = new PhoneBookWriter.Location((double)i, null); + } + // row index of each row in the file is same as the user id. + users.add(new PhoneBookWriter.User(i, "p" + i, Arrays.asList(new PhoneBookWriter.PhoneNumber(i, "cell")), location)); + } + return users; + } + + private static Path createTempFile() { + try { + return new Path(Files.createTempFile("test-ci_", ".parquet").toAbsolutePath().toString()); + } catch (IOException e) { + throw new AssertionError("Unable to create temporary file", e); + } + } + + private static void writePhoneBookToFile(Path file, ParquetProperties.WriterVersion parquetVersion) throws IOException { + int pageSize = DATA.size() / 10; // Ensure that several pages will be created + int rowGroupSize = pageSize * 6 * 5; // Ensure that there are more row-groups created + + PhoneBookWriter.write(ExampleParquetWriter.builder(file) + .withWriteMode(OVERWRITE) + .withRowGroupSize(rowGroupSize) + .withPageSize(pageSize) + .withWriterVersion(parquetVersion), + DATA); + } + + private List readUsers(FilterCompat.Filter filter, boolean useOtherFiltering, boolean useColumnIndexFilter) + throws IOException { + return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file) + .withFilter(filter) + .useDictionaryFilter(useOtherFiltering) + .useStatsFilter(useOtherFiltering) + .useRecordFilter(useOtherFiltering) + .useColumnIndexFilter(useColumnIndexFilter), true); + } + + @Test + public void testCurrentRowIndex() throws Exception { + ParquetReader reader = PhoneBookWriter.createReader(file, FilterCompat.NOOP); + // Fetch row index without processing any row. + TestUtils.assertThrows("row index can be fetched only after processing a row", + RowIndexFetchedWithoutProcessingRowException.class, reader::getCurrentRowIndex); + reader.read(); + assertEquals(reader.getCurrentRowIndex(), 0); + // calling the same API again and again should return same result. + assertEquals(reader.getCurrentRowIndex(), 0); + + reader.read(); + assertEquals(reader.getCurrentRowIndex(), 1); + assertEquals(reader.getCurrentRowIndex(), 1); + long expectedCurrentRowIndex = 2L; + while(reader.read() != null) { + assertEquals(reader.getCurrentRowIndex(), expectedCurrentRowIndex); + expectedCurrentRowIndex++; + } + // reader.read() returned null and so reader doesn't have any more rows. + TestUtils.assertThrows("row index can be fetched only after processing a row", + RowIndexFetchedWithoutProcessingRowException.class, reader::getCurrentRowIndex); + } + + @Test + public void testSimpleFiltering() throws Exception { + Set idSet = new HashSet<>(); + idSet.add(1234l); + idSet.add(5678l); + // The readUsers also validates the rowIndex for each returned row. + List filteredUsers1 = readUsers(FilterCompat.get(in(longColumn("id"), idSet)), true, true); + assertEquals(filteredUsers1.size(), 2L); + List filteredUsers2 = readUsers(FilterCompat.get(in(longColumn("id"), idSet)), true, false); + assertEquals(filteredUsers2.size(), 2L); + List filteredUsers3 = readUsers(FilterCompat.get(in(longColumn("id"), idSet)), false, false); + assertEquals(filteredUsers3.size(), 10000L); + } + + @Test + public void testNoFiltering() throws Exception { + assertEquals(DATA, readUsers(FilterCompat.NOOP, false, false)); + assertEquals(DATA, readUsers(FilterCompat.NOOP, true, false)); + assertEquals(DATA, readUsers(FilterCompat.NOOP, false, true)); + assertEquals(DATA, readUsers(FilterCompat.NOOP, true, true)); + } +} + From 23f4070e20bc8c6b8420f20c3a44df4aec7b8a67 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Tue, 15 Feb 2022 20:25:05 -0800 Subject: [PATCH 2/7] address review comments --- .../converter/ParquetMetadataConverter.java | 4 +-- .../hadoop/ColumnChunkPageReadStore.java | 12 +++++-- .../hadoop/InternalParquetRecordReader.java | 34 +++++++++---------- .../filter2/recordlevel/PhoneBookWriter.java | 4 +++ 4 files changed, 33 insertions(+), 21 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index fac0698f74..ea0f5c628a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -1417,8 +1417,8 @@ private Map generateRowGroupOffsets(FileMetaData metaData) { * A container for [[FileMetaData]] and [[RowGroup]] to ROW_INDEX offset map. */ private class FileMetaDataAndRowGroupOffsetInfo { - FileMetaData fileMetadata; - Map rowGroupToRowIndexOffsetMap; + final FileMetaData fileMetadata; + final Map rowGroupToRowIndexOffsetMap; public FileMetaDataAndRowGroupOffsetInfo(FileMetaData fileMetadata, Map rowGroupToRowIndexOffsetMap) { this.fileMetadata = fileMetadata; this.rowGroupToRowIndexOffsetMap = rowGroupToRowIndexOffsetMap; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java index 162b9cdf17..85ba98cdbb 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java @@ -251,7 +251,15 @@ public DictionaryPage readDictionaryPage() { private final long rowIndexOffset; private final RowRanges rowRanges; - public ColumnChunkPageReadStore(long rowCount, long rowIndexOffset) { + public ColumnChunkPageReadStore(long rowCount) { + this(rowCount, -1); + } + + ColumnChunkPageReadStore(RowRanges rowRanges) { + this(rowRanges, -1); + } + + ColumnChunkPageReadStore(long rowCount, long rowIndexOffset) { this.rowCount = rowCount; this.rowIndexOffset = rowIndexOffset; rowRanges = null; @@ -270,7 +278,7 @@ public long getRowCount() { @Override public Optional getRowIndexOffset() { - return Optional.of(rowIndexOffset); + return rowIndexOffset < 0 ? Optional.empty() : Optional.of(rowIndexOffset); } @Override diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java index 4acf6796f1..8624be160c 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java @@ -71,8 +71,8 @@ class InternalParquetRecordReader { private long current = 0; private int currentBlock = -1; private ParquetFileReader reader; - private long currentRowIndex = -1L; - private PrimitiveIterator.OfLong rowIndexWithinFileIterator; + private long currentRowIdx = -1L; + private PrimitiveIterator.OfLong rowIdxInFileItr; private org.apache.parquet.io.RecordReader recordReader; private boolean strictTypeChecking; @@ -232,8 +232,8 @@ public boolean nextKeyValue() throws IOException, InterruptedException { try { currentValue = recordReader.read(); - if (rowIndexWithinFileIterator != null) { - currentRowIndex = rowIndexWithinFileIterator.next(); + if (rowIdxInFileItr != null) { + currentRowIdx = rowIdxInFileItr.next(); } } catch (RecordMaterializationException e) { // this might throw, but it's fatal if it does. @@ -280,44 +280,44 @@ public long getCurrentRowIndex() { if (current == 0L) { throw new RowIndexFetchedWithoutProcessingRowException("row index can be fetched only after processing a row"); } - if (rowIndexWithinFileIterator == null) { + if (rowIdxInFileItr == null) { throw new RowIndexNotSupportedException("underlying page read store implementation" + " doesn't support row index generation"); } - return currentRowIndex; + return currentRowIdx; } /** * Resets the row index iterator based on the current processed row group. */ private void resetRowIndexIterator(PageReadStore pages) { - Optional rowIndexOffsetForCurrentRowGroup = pages.getRowIndexOffset(); - currentRowIndex = -1L; - if (rowIndexOffsetForCurrentRowGroup.isPresent()) { - final PrimitiveIterator.OfLong rowIndexWithinRowGroupIterator; + Optional rowGroupRowIdxOffset = pages.getRowIndexOffset(); + currentRowIdx = -1L; + if (rowGroupRowIdxOffset.isPresent()) { + final PrimitiveIterator.OfLong rowIdxInRowGroupItr; if (pages.getRowIndexes().isPresent()) { - rowIndexWithinRowGroupIterator = pages.getRowIndexes().get(); + rowIdxInRowGroupItr = pages.getRowIndexes().get(); } else { // If `pages.getRowIndexes()` is empty, this means column indexing has not triggered. // So start generating row indexes for each row - starting from 0. - rowIndexWithinRowGroupIterator = LongStream.range(0, pages.getRowCount()).iterator(); + rowIdxInRowGroupItr = LongStream.range(0, pages.getRowCount()).iterator(); } // Adjust the row group offset in the `rowIndexWithinRowGroupIterator` iterator. - this.rowIndexWithinFileIterator = new PrimitiveIterator.OfLong() { + this.rowIdxInFileItr = new PrimitiveIterator.OfLong() { public long nextLong() { - return rowIndexOffsetForCurrentRowGroup.get() + rowIndexWithinRowGroupIterator.nextLong(); + return rowGroupRowIdxOffset.get() + rowIdxInRowGroupItr.nextLong(); } public boolean hasNext() { - return rowIndexWithinRowGroupIterator.hasNext(); + return rowIdxInRowGroupItr.hasNext(); } public Long next() { - return rowIndexOffsetForCurrentRowGroup.get() + rowIndexWithinRowGroupIterator.next(); + return rowGroupRowIdxOffset.get() + rowIdxInRowGroupItr.next(); } }; } else { - this.rowIndexWithinFileIterator = null; + this.rowIdxInFileItr = null; } } } diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java index a177a8a69c..1e74353e2c 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java @@ -342,6 +342,10 @@ public static List readFile(File f, Filter filter) throws IOException { return users; } + public static List readUsers(ParquetReader.Builder builder) throws IOException { + return readUsers(builder, false); + } + /** * Returns a list of users from the underlying [[ParquetReader]] builder. * If `validateRowIndexes` is set to true, this method will also validate the ROW_INDEXes for the From 9f525f5ba003026f2e4ecda76b74653df21dbdb0 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Tue, 22 Feb 2022 20:15:38 -0800 Subject: [PATCH 3/7] add test based on old parquet file without column indexes --- .../parquet/hadoop/TestParquetReader.java | 22 +++++++++++++----- ...test-file-with-no-column-indexes-1.parquet | Bin 0 -> 35855 bytes 2 files changed, 16 insertions(+), 6 deletions(-) create mode 100644 parquet-hadoop/src/test/resources/test-file-with-no-column-indexes-1.parquet diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java index cedc2c7313..59d55a5ccc 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.net.URISyntaxException; import java.nio.file.Files; import java.util.*; @@ -46,10 +47,19 @@ public class TestParquetReader { private static final Path FILE_V1 = createTempFile(); private static final Path FILE_V2 = createTempFile(); - private static final List DATA = Collections.unmodifiableList(makeUsers(10000)); + private static final Path STATIC_FILE_WITHOUT_COL_INDEXES = createPathFromCP("/test-file-with-no-column-indexes-1.parquet"); + private static final List DATA = Collections.unmodifiableList(makeUsers(1000)); private final Path file; + private static Path createPathFromCP(String path) { + try { + return new Path(TestParquetReader.class.getResource(path).toURI()); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + } + public TestParquetReader(Path file) { this.file = file; } @@ -58,7 +68,8 @@ public TestParquetReader(Path file) { public static Collection data() { Object[][] data = new Object[][] { { FILE_V1 }, - { FILE_V2 } }; + { FILE_V2 }, + { STATIC_FILE_WITHOUT_COL_INDEXES } }; return Arrays.asList(data); } @@ -151,15 +162,15 @@ public void testCurrentRowIndex() throws Exception { @Test public void testSimpleFiltering() throws Exception { Set idSet = new HashSet<>(); - idSet.add(1234l); - idSet.add(5678l); + idSet.add(123l); + idSet.add(567l); // The readUsers also validates the rowIndex for each returned row. List filteredUsers1 = readUsers(FilterCompat.get(in(longColumn("id"), idSet)), true, true); assertEquals(filteredUsers1.size(), 2L); List filteredUsers2 = readUsers(FilterCompat.get(in(longColumn("id"), idSet)), true, false); assertEquals(filteredUsers2.size(), 2L); List filteredUsers3 = readUsers(FilterCompat.get(in(longColumn("id"), idSet)), false, false); - assertEquals(filteredUsers3.size(), 10000L); + assertEquals(filteredUsers3.size(), 1000L); } @Test @@ -170,4 +181,3 @@ public void testNoFiltering() throws Exception { assertEquals(DATA, readUsers(FilterCompat.NOOP, true, true)); } } - diff --git a/parquet-hadoop/src/test/resources/test-file-with-no-column-indexes-1.parquet b/parquet-hadoop/src/test/resources/test-file-with-no-column-indexes-1.parquet new file mode 100644 index 0000000000000000000000000000000000000000..722e687ee63ac0c67ce237cecc6d2ea2d6883572 GIT binary patch literal 35855 zcmeI*33Oa_-SF}00z|e3AvM*ik);$Yklg(u$gr0^>}5}(Ed^Q#d)dQY_OO>d;#L(g zB5Fm%h=>&t;{sL%jT=@}jEGtl@x3$mb3tpL=kZ;hbDr}abEf@dCNsY*)9-)or0Kui z+wa%aR%;ubP@JZxIG(F77|!Bs&f&3~%j0-FPvAVx z=ZQRty*!zx@Km10(|HEZvw05B<$0`@fnr@4nNXajwY~EC8u5xNwYOa0wPtSB znLZa@$cuO}FX5%UjF)pEui%xuidXX*Ud!v)&+B;uZ{$t9nYZv(F5+#xopm+~uof?ws6{2HI)*E!5@@SFS=zs;xl48Ox={4T%8@AFwc z#~<*A9N~}nWB!Cc<gp34R%c#y;on#tP>gZ2u^h+oT#c)94Yu$NT$5{YZLY(0If1QQ zk8kAq+<;Ufi4eu$%jGKkm;1 zcpwkr!JNTEcqk9!;XHyz@+kIjCXePZoW@4j{UrzH}FQ@#G82wZ{;H1#@l%Z@8n&)oA+>l@8P|CFW<-a^8@@K7xP2>Fh9ck z_)*@^k8zM6=L7r%ALJ+bDSnzu_!&OL&+=h@j*sy39O4)FD8I!%_a0FY$MLnZM^B_(xXxuX1fTV{Cau(BASVS8uKj)tNuL@GpFYf90$E z8*9dm;aHC2c&^6PxdvPK2Cm7qxHi|}x}3mPuE#fWeQv-uPUM@|&PjYT-@?g!E8oVq zvxD#8hTMo7a}!SCrku*nxH;d+Y21Qaaw~SSivdGMjF~WH#++O8UEGG-^4;8yb#Bic zxFdJs&fJB&ayob8?%acWaxd=9eb~)?xgYoE0X&cg@nFv2Av~0a@o*l&BY6~iIFm>7 z7|!Bs&f&3~%j0-FPvAVx=ZQRty*!zx@Km10(^=)e%C+HTS88uL(`(JOp*pj`g=g_> zp2Kr_9?$0m?Bj*Jh!^t`UdqdOIT!K@UdgL?HLu~dypH|6o;UDD-o%@E3vcBj-p1Q` z2k+!vyqouMfbZeGd@tX}_wxh%AQ$sP{4hVl`}k4b&yR7CALj%71Rvxl`6+&yOZXW+ z#Lx0!evXgu^Bm$A_$a@~$M_{a&M$K*zrrW@RX)kD@hN_t!~6!n$#3!7e45YjJ6y)^ z@_YO~pXGD>0e{F5{)j*3Pxw>*jL-AuT+Uzc1^$x1;;;E4f5TD!mM`<nx3`hJm$< z)3mn7UjJU?LF209hTpq-b8V>3{K18PVoD!5%f^tewP6^5> zVL2r%r-bE{u$&T>Q^ImeSWXGcDPcJ!ET=@}l&G8%l~bZ}N>omX$|+GfB`T*x<&>zL z5|>lra!OoIiOVT*IVCQq#O0K@oD!E);&Mt-PD#osNjW7crzGW+q@0qJQ<8E@Qcg+A zDQP(+EvKaAl(d|ZmQ&JlN?J}y%PDC&B`v39<&>oTv)ye{(s^8)h%^8)h%^8)h%^8)h%^8)ii^Fs4N^Fs4N^Fs4N^Fs4N z^TNDX4PKp4oct6(b7dIc8zf2l_#)WX7l&8Jj48iFk#`+=*O7M}dDoG59eLN0cO7}x zk#`;WRATdD^J4R2^J4R2^J4R2^J4R2^AhtC^AhtC^AhtC^AhtC^AhtC^HTFt^HTFt z^HTFt^HTFt^HTFt^D^@?^D^@?^D^@?^D^@?^D^@?^K$cY^K$cY^K$cY^K$cY^K$c~ z6i6xPEawHC<-DM?oELPK^McNDUeH<23p&erL1#HH=qx|JKuUp>0x1Pj3ZxWBDUebi zr9eu7lmaOQQVOILNGXs~Af-S`fs_I%1yTy66i6wMQXr*3N`aIDDFsprq!ffcUws7n z2=o!?BhW{nk3b)RJ_3CN`Uvz9=p)cappQTwfj$C#1o{Z{5$Ge(N1%^DAAvpseFXX_ zR)S)QcyU5;@ z0x1Pj3ZxWBDUebir9eu7lmaOQQVOILNGXs~Af-S`fs_I%1yTy66i6wMQXr*JN}-fO zDTPuBr4&jjlu{_AP)ebcLMeq(3Z)cEDU?zurBF(tltL+mQVOLMN-305D5X$Jp_D=? zg;ENo6iO+SQYfWRN}-fODTPuBr4&jjlu{_AP)ebcLMeq(3Z)cEDU?zurBF(tltL+m zQVOLMN-305D5X$Jp_D=?g;ENo6iO+SQYfWRN}-fO4>h4aLVbk#2=x)_Bh*Kzk5C_> zK0Lb)gsE<$|p*})=g!%~e5$Yqq6=2m%L)?`WOGB84|Fkp| zmo=A$P)ebcLMeq(3Z)cEDUwnorASJVlp-lbQi`M$Nhy+2B&A47k(44SMN*2S6iF$P zQY58FN|BTzDMeC>q!dXhl2Rn4NJ^2EA}K{uilh`tDUwnorASJVlp-lbQi`M$Nhy+2 zB&A47k(44SMN*2S6iF$PQY58FN|BTzDMeC>q!dXhl2Rn4NJ^2EA}K{uilh`tDUwno zrASJVlp-lbQi`M$Nhy+2B&A47k(44SMN*2S6iF$PQY58FN|BTzDMeC>q!dXhl2Rn4 zNJ^2EA`dl@J|cZY`iS%q=_Ardq>or1u|8sb#QKQ!5$hw?N34%nAF)1SeZ=~R^%3hM z)<>+5SRb)IiWOkhOG6Sx#nKRW<^Qxa6qhxZhFD6mlwv8xQi`P%ODUF8ETvdVv6Nyd z#Zro;6iX?VQY@udO0kq;DaBHXr4&mkmQpOGSW2;!VkyN^ilr1wDV9G77k?14QN1~5JABjE^ zeI)uw^pWTz(MO_>L?4Mh5`854Nc55DBhg2qk3=7dK8h7!)k{N~b{0!R62Lb-h zs*hA3sXmGopjaB7TC+G!Yg^p-OFrLg$l}IPLz;HIwlpmEy@ukl=F*T#DV0(xrBq6( zlu{|BQc9(iN-336Dy39PsgzPFrBX_zlu9X;QYxiXN~x4mDWy_MrIbo3l~O9DR7#nY zGAU(J%A}M@DU(trrA$hhlrkx0Qp%*1Nhy<3CZ$YDnUpdqWm3wdlu0R*QYNKLN|}^0 zDP>a1q?Ac1lTs$7OiG!QGAU(J%A}M@DU(trrA$hhlrkx0Qp%*1Nhy<3CZ$YDnUpdq zWm3wdlu0R*Qt>CY_~ZNhgyJ-h(D%A8ws8*a<(Sm*YoeU&@7d`Ir&?Ug&b zd>8KO?UmDAz8iP<_R2k6z9;wc_R76oz7M-g>l|WjU$^5B>-)K!wf)_mL#!R(dK_Z? zK$o+2klS;JwS!%cL#)qmIctZwJ%?C3)b%*T`e80-?Qpl}5Nk)c9*0;z(&elj<@Ow6 zt;gl;;Q)tNn^|73gFWoy5J%W?w7Kl#07qCmre15j)Y-!U4zV_?yj}-;*vBD`uw!<4 z{W|+Nz!BEwxE_lY;PvI@I103QoM_D_~$H`81vxj}`=OBkT!cn%K zZZ7NWW-t3Vz(Edkgtaq#{_JF(JuFs$*O!OqS6UvH^;OG5{~7g(XVfeY|2qc{Q;QoF zFU9Sf4seLW9A)jy@_HTYWH)=*$9@iSh$9?j>jHCGXE%G<#{mv|{54*vEbja)=`wW$QWSvd(VyvX28CgF5?JCxq>Yhl^<^_J2;gY>+I$X&SfwA zxR8rDz(FqIQVw&3%ejKJi|e(;ao0YAN#q8i#f<4F6AXnWj2CuA^hlQ8b zC(bDT*lpDK^yVCv7c6P9n8K<+GJ)Fx0>|;L{aWRVu zUO&JdT&;M$`JLvLTH~eYz54cCtv>OIam7UkH-5EWHuJyX)Oe4@OTGDft%K7TackDO zBX{9$+>`ro-_pMQoIk+%1D!wE`5De1>il8OAL0Cw&i9m7-><2@FH=9-+mG>fR^K0$NEsKYau>X}aU zjHY_#QawYddFIf=>X}7{X8_eRh3XkWb>v?i;ny5__pmxL?>N8w`c+5T)e&`dQ#qy)uCJKMdjQ&t3$2<=c_}gnnRIJR);>- zAx?EDQysE&I25Q3L8?QI>X4#3bZGTZTpbD&OF*$EyfC3SO>28_rS+lj=4yQ?9{DFe z)mR@6Y5e-{SRH1zuU?EUUaEtP>R_UFS-p9AC#!>i>XEs61g;)wJ3N9_kDS#bWc5f` zJ>s=`1gRdusz<8o5vh9Qsae50Sv_J@j||l#K(*3WD|)qZS1WX}5;s4O5w>1cp0Bf) z0~}`U>hgM>>|sBLILeM|%IkNtkAoax>$ToLdpW>i)~@sZ*~5Mgag?q7<@Kks&Kd0G zLJn{Vhq;`!>&toV?BsOzZ~^iez0v2-I%lw# z3pv0g9OiP?Zu0rFlhfJ51?=Zy4sjVr*>bbbpBcM3m&GDbtPoGFQJki=J+ab3_Tt;C z6=KoN^@-1psnu%lZ2bC{4<8!0DE>j7_ch3+9N`MK-coN~e=6&o!Co%p0GDu>%UQd% zoY&4yPG=7nu%C-L#AO_1%c62_2QzkaF8jEMgIvlHu3+nJK7ZCZgS}kH0WRS%m$P=e z&!3%~&K@pcKa0D)_%m^JYjJw*pHr>=A8)GKc;F~DE-4vXoN2z4cYUo^UC?-Q<7IqF zZ8dsasIB3AdF)!Ny}|i4xfa*vI$W0%*vj?zMy}5d*v5%`6Wck7Z{}M#nQ!IW_;z;i z9o&!`abs@6DcqD(xfwTSBfQ4Tw36Bu+>%?dlU)oLGGfexDKqBWn(yK^+?Ma=cC2%I z?!X?88Z|=ix?#unSKM&x6JctK#1{;S$jh919YKQS~9>F7d z6ni+6NAnoY;%v_0v7F1}csx(wJkIBdJc+$LnWyknp2pL82G8UIp2f3y4$tLzJfFpi z@ydka^xD6!YOENqc;iZ2tTxtySIQORl`&&4aHT$8$cuO}FX5%UjF)pEui%xuidXX* zUd!v)&+B;uZ{$t9nYZv(F5+#xopm+~uof?ws6 z{2HI)*E!5@@SFS=zs;xl48Ox={4T%8@AFwc#~<*A9N~}nWB!Cc<vIFPaU$Quc244(`4&#*TlqG= zogI7!H{?d#n4544H|11r#?ARoPU9Bbl3THpT?`m9V$6goGv?fy@8UMxmha|vtaE$r zz#X|0cjhkKmD9N!cjq45lY4P*?!#{G%l)`N58#13hzD~958?EgK2PLH?B&Tkg{Sf~p3XCPCKvE5R*S(ZSBe#`w9@vg|F}|| z<4Wi9Jf6=B*vAWb5ijN?yp)&maxUZ*ypmV(YF@)@c^&(CJ#XNRyoopS7T(H5yp6Z> z4&KSTcsK9i0N=xV`Ch({@8<{jK`!Qp_+fs8_wl2=pC98OKh6jE2|mbA@>BdYm+&)u zh@a)d{2U+Q=Q+eL@KJt|kMT=ircD!%T`{k;mCJA4&3_xLJ|T(7y$S8=|%(^p}0udl-9ZeNAD>ut?< zaT{*ScXK<|xjlE_j@*ema~JN)>D-OGa}Vyxy|_2`VK?{Xe%zl2@IW5KgE@nT@K7Gc z!+8Xc7?_c~E0bC|=rIe(&lN8)qAr7RQPuW2-m)!Bzgq75o$b z%)jsz{*|xtZ*0yl-qXLgZEbwbYqfuG+gg4dcJzmGC;pGywsm}4Yg_A{ZDZTUmwy3x z%^=k1)zWLufHg8h=CvN}AO^PG; z#!X%+ZqoeU-u}P4NwIJAfAZ068kK|G8;Y=RQV>pYmIfut`E|25!Jc08#pC|Gp_VQ$&!c%z~Pv;ptlM8qj&*nKi zm*??(Ucf$H$cuO}FX5%UjF)pEui%xuidXX*Ud!v)&+B;uZ{$t9nYZv(F5+#xopm+~uY{^V}4pncW(C;2rte{!#Q``4Wx<~R6Fev98` zwG0$%#K73%G_7sp;??s1&;7aUw6{FtwdN90o%xOnm+`y&9>33L`5b@1A9930;*a?g z{**uC^Q=C+;^SS#uD^NKRcy17#jo$cQ|UWVZRpZ1a)X<|DJsM`oLk z%r+mHZ9X#Fd}Oxy$ZYeG+2$j&%|~XNkIXh7nQcBY+k9lU`N(YZk=f=Wv&~0ln~%&k zADL}FGTVG)w)x0x^O4!+Be%^*ZkvzXHXpfdK62Z9>m%1ku@V$Z#C>CnQ@Og?h$k=p z%YgB{c@{KRwZ^lnjmvGpi;J2IL@uOUNV$-5A>~5Kg_H{^7g8>yTu8Z)av|kH%7v5* zDHl@l0Tv(lGiw*8X>Iqe^kbQw*Hu53Ido-x;?c!NziH#wzq08{alOV%al58Vxs26M zFONFEf-P5-*ROu6xx@LX%vk-jbGP#|IG5E=LH9Yokc(LTbo8L}OSqK79N}`VVD;0~ ztyh;HZ#$>5`YG$W^V2zl)lXmdI=_Gm+0Owk<`NEZn9I1FqpV$1eta!#XZ6$Foz7>R z&g!SWdz_!k1?*!#7jZGGpAsK(ekqr6gri)+mTTSr?BG;pth1XlIG5E=q4znzkc(LT zbo!w4OSqKPPpywQznm*r{WN>)b?$#oWl>HetJRCsKW8=dp4YOvy*{z|H-XKr{*|A0 zu9YuCx0mmC8AsW2M|pV%Gj?+>`?!dMT*?uyVC$Xb+^MW{279@X16;ykE@$noa$Y+- zIh{RRz&9Y1drrV?BPrv&0{!=vpI*y^54&Ui@T+G`S{HrWYHi~;skM(iZdR>r?M=qEk3Vtd{Mnn-YVp|imU$-~GxN0LPCBvp z&TreA#oNZsJL$ygwV9_iuCjLT+>=h6z4z(!kDh(XscmCx@%Z-Dnin;0(wMvYiOsi} zeZp}k&MIb49G);~+N60?S|+!RpR`F!ZBpaix78-zzs|&ECl#kDE!+J4U)Gd=l8T$v zYLg}xhXA#;8xd*UN|Pbscdd=SldUHtRF>fd4)TO~d;zWONEYRxK=go^?#RDEB%t$E`mzbkJ1AKjao z2LIQ0X)*T2F~w<0asB`NE^T}zCqB6E#NzMtCrxf&@61z{|P7y)#ca`SjVRZ886pjx&n%i$$WNYs-Ag&W@=^pMKoDSslgy^N|Fz xgJX(4W4q#6oyGS~X3d-xbk50UcV%;eS;?Gy&e5|v!(%q9U0uA}?(O&M`d?!$hfDwf literal 0 HcmV?d00001 From 1ecd3f01d7be29aa0233c6e4e7c8b3af6c4af372 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Fri, 25 Feb 2022 09:50:48 -0800 Subject: [PATCH 4/7] address review comments - Return -1 when row index info not available, document the same, Return -1 when rowIndexOffset info not available in BlockMetadata --- .../parquet/column/page/PageReadStore.java | 4 +-- .../converter/ParquetMetadataConverter.java | 6 ++-- .../hadoop/InternalParquetRecordReader.java | 21 ++++++------- .../apache/parquet/hadoop/ParquetReader.java | 4 +-- ...xFetchedWithoutProcessingRowException.java | 30 ------------------ .../hadoop/RowIndexNotSupportedException.java | 31 ------------------- .../hadoop/metadata/BlockMetaData.java | 11 ++++--- .../parquet/hadoop/TestParquetReader.java | 6 ++-- 8 files changed, 26 insertions(+), 87 deletions(-) delete mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowIndexFetchedWithoutProcessingRowException.java delete mode 100644 parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowIndexNotSupportedException.java diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java index adc5c376df..796cf17ff4 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageReadStore.java @@ -44,8 +44,8 @@ public interface PageReadStore { long getRowCount(); /** - * - * @return the row index offset of this row group. + * @return the optional of the long representing the row index offset of this row group or an empty optional if the + * related data is not available */ default Optional getRowIndexOffset() { return Optional.empty(); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index ea0f5c628a..0ea75f3d86 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -1509,7 +1509,7 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata, InternalFileDecryptor fileDecryptor, boolean encryptedFooter) throws IOException { - return fromParquetMetadata(parquetMetadata, fileDecryptor, encryptedFooter, generateRowGroupOffsets(parquetMetadata)); + return fromParquetMetadata(parquetMetadata, fileDecryptor, encryptedFooter, new HashMap()); } public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata, @@ -1525,7 +1525,9 @@ public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata, BlockMetaData blockMetaData = new BlockMetaData(); blockMetaData.setRowCount(rowGroup.getNum_rows()); blockMetaData.setTotalByteSize(rowGroup.getTotal_byte_size()); - blockMetaData.setRowIndexOffset(rowGroupToRowIndexOffsetMap.get(rowGroup)); + if (rowGroupToRowIndexOffsetMap.containsKey(rowGroup)) { + blockMetaData.setRowIndexOffset(rowGroupToRowIndexOffsetMap.get(rowGroup)); + } // not set in legacy files if (rowGroup.isSetOrdinal()) { blockMetaData.setOrdinal(rowGroup.getOrdinal()); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java index 8624be160c..7f6f29b2dc 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java @@ -71,7 +71,7 @@ class InternalParquetRecordReader { private long current = 0; private int currentBlock = -1; private ParquetFileReader reader; - private long currentRowIdx = -1L; + private long currentRowIdx = -1; private PrimitiveIterator.OfLong rowIdxInFileItr; private org.apache.parquet.io.RecordReader recordReader; private boolean strictTypeChecking; @@ -232,8 +232,10 @@ public boolean nextKeyValue() throws IOException, InterruptedException { try { currentValue = recordReader.read(); - if (rowIdxInFileItr != null) { + if (rowIdxInFileItr != null && rowIdxInFileItr.hasNext()) { currentRowIdx = rowIdxInFileItr.next(); + } else { + currentRowIdx = -1; } } catch (RecordMaterializationException e) { // this might throw, but it's fatal if it does. @@ -274,15 +276,12 @@ private static Map> toSetMultiMap(Map map) { } /** - * Returns the ROW_INDEX of the current row. + * Returns the row index of the current row. If no row has been processed or if the + * row index information is unavailable from the underlying @{@link PageReadStore}, returns -1. */ public long getCurrentRowIndex() { - if (current == 0L) { - throw new RowIndexFetchedWithoutProcessingRowException("row index can be fetched only after processing a row"); - } - if (rowIdxInFileItr == null) { - throw new RowIndexNotSupportedException("underlying page read store implementation" + - " doesn't support row index generation"); + if (current == 0L || rowIdxInFileItr == null) { + return -1; } return currentRowIdx; } @@ -292,14 +291,12 @@ public long getCurrentRowIndex() { */ private void resetRowIndexIterator(PageReadStore pages) { Optional rowGroupRowIdxOffset = pages.getRowIndexOffset(); - currentRowIdx = -1L; + currentRowIdx = -1; if (rowGroupRowIdxOffset.isPresent()) { final PrimitiveIterator.OfLong rowIdxInRowGroupItr; if (pages.getRowIndexes().isPresent()) { rowIdxInRowGroupItr = pages.getRowIndexes().get(); } else { - // If `pages.getRowIndexes()` is empty, this means column indexing has not triggered. - // So start generating row indexes for each row - starting from 0. rowIdxInRowGroupItr = LongStream.range(0, pages.getRowCount()).iterator(); } // Adjust the row group offset in the `rowIndexWithinRowGroupIterator` iterator. diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java index 5d81801e32..3edd6d39f3 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java @@ -141,11 +141,11 @@ public T read() throws IOException { } /** - * Returns the ROW_INDEX of the last read row. + * Returns the row index of the last read row. If no row has been processed, returns -1. */ public long getCurrentRowIndex() { if (reader == null) { - throw new RowIndexFetchedWithoutProcessingRowException("row index can be fetched only after processing a row"); + return -1; } return reader.getCurrentRowIndex(); } diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowIndexFetchedWithoutProcessingRowException.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowIndexFetchedWithoutProcessingRowException.java deleted file mode 100644 index aa4aa25ccf..0000000000 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowIndexFetchedWithoutProcessingRowException.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.hadoop; - -/** - * Exception class to signify that row index information is unavailable. - */ -public class RowIndexFetchedWithoutProcessingRowException - extends IllegalStateException { - - public RowIndexFetchedWithoutProcessingRowException(String message) { - super(message); - } -} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowIndexNotSupportedException.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowIndexNotSupportedException.java deleted file mode 100644 index f19828ab40..0000000000 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/RowIndexNotSupportedException.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.hadoop; - -/** - * Exception class to signify that row index information is unavailable. - */ -public class RowIndexNotSupportedException - extends UnsupportedOperationException { - - public RowIndexNotSupportedException(String message) { - super(message); - } - -} diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java index 457d7a865b..4f9fd14f51 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/BlockMetaData.java @@ -33,7 +33,7 @@ public class BlockMetaData { private long totalByteSize; private String path; private int ordinal; - private long rowIndexOffset; + private long rowIndexOffset = -1; public BlockMetaData() { } @@ -67,14 +67,13 @@ public void setRowCount(long rowCount) { } /** - * @return the rowIndexOffset + * @return -1 if the rowIndexOffset for the {@link BlockMetaData} is unavailable else returns the actual rowIndexOffset */ public long getRowIndexOffset() { return rowIndexOffset; } /** * @param rowIndexOffset the rowIndexOffset to set */ - public void setRowIndexOffset(long rowIndexOffset) { this.rowIndexOffset = rowIndexOffset; } @@ -119,7 +118,11 @@ public long getStartingPos() { @Override public String toString() { - return "BlockMetaData{" + rowCount + ", " + totalByteSize + " " + columns + "}"; + String rowIndexOffsetStr = ""; + if (rowIndexOffset != -1) { + rowIndexOffsetStr = ", rowIndexOffset = " + rowIndexOffset; + } + return "BlockMetaData{" + rowCount + ", " + totalByteSize + rowIndexOffsetStr + " " + columns + "}"; } /** diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java index 59d55a5ccc..86f14a8628 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java @@ -139,8 +139,7 @@ private List readUsers(FilterCompat.Filter filter, boolean public void testCurrentRowIndex() throws Exception { ParquetReader reader = PhoneBookWriter.createReader(file, FilterCompat.NOOP); // Fetch row index without processing any row. - TestUtils.assertThrows("row index can be fetched only after processing a row", - RowIndexFetchedWithoutProcessingRowException.class, reader::getCurrentRowIndex); + assertEquals(reader.getCurrentRowIndex(), -1); reader.read(); assertEquals(reader.getCurrentRowIndex(), 0); // calling the same API again and again should return same result. @@ -155,8 +154,7 @@ public void testCurrentRowIndex() throws Exception { expectedCurrentRowIndex++; } // reader.read() returned null and so reader doesn't have any more rows. - TestUtils.assertThrows("row index can be fetched only after processing a row", - RowIndexFetchedWithoutProcessingRowException.class, reader::getCurrentRowIndex); + assertEquals(reader.getCurrentRowIndex(), -1); } @Test From 760dada46dcf18de73c74935a3eb226aefded1e6 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Mon, 7 Mar 2022 09:13:36 -0800 Subject: [PATCH 5/7] address review comments - Fix java doc style --- .../src/main/java/org/apache/parquet/hadoop/ParquetReader.java | 2 +- .../java/org/apache/parquet/hadoop/ParquetRecordReader.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java index 3edd6d39f3..6d7672365b 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java @@ -141,7 +141,7 @@ public T read() throws IOException { } /** - * Returns the row index of the last read row. If no row has been processed, returns -1. + * @return the row index of the last read row. If no row has been processed, returns -1. */ public long getCurrentRowIndex() { if (reader == null) { diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java index e50c23738d..e46ccdd156 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetRecordReader.java @@ -208,7 +208,7 @@ public boolean nextKeyValue() throws IOException, InterruptedException { } /** - * Returns the ROW_INDEX of the current row. + * @return the row index of the current row. If no row has been processed, returns -1. */ public long getCurrentRowIndex() throws IOException { return internalReader.getCurrentRowIndex(); From d58bc95e2bb056cafb2b182a18c9f8cc80fd85b4 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Sun, 13 Mar 2022 10:04:01 -0700 Subject: [PATCH 6/7] address review comments from ggershinsky - early return and reduce indentation --- .../hadoop/InternalParquetRecordReader.java | 43 ++++++++++--------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java index 7f6f29b2dc..182f5d8eee 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java @@ -291,30 +291,31 @@ public long getCurrentRowIndex() { */ private void resetRowIndexIterator(PageReadStore pages) { Optional rowGroupRowIdxOffset = pages.getRowIndexOffset(); + if (rowGroupRowIdxOffset.isEmpty()) { + this.rowIdxInFileItr = null; + return; + } + currentRowIdx = -1; - if (rowGroupRowIdxOffset.isPresent()) { - final PrimitiveIterator.OfLong rowIdxInRowGroupItr; - if (pages.getRowIndexes().isPresent()) { - rowIdxInRowGroupItr = pages.getRowIndexes().get(); - } else { - rowIdxInRowGroupItr = LongStream.range(0, pages.getRowCount()).iterator(); + final PrimitiveIterator.OfLong rowIdxInRowGroupItr; + if (pages.getRowIndexes().isPresent()) { + rowIdxInRowGroupItr = pages.getRowIndexes().get(); + } else { + rowIdxInRowGroupItr = LongStream.range(0, pages.getRowCount()).iterator(); + } + // Adjust the row group offset in the `rowIndexWithinRowGroupIterator` iterator. + this.rowIdxInFileItr = new PrimitiveIterator.OfLong() { + public long nextLong() { + return rowGroupRowIdxOffset.get() + rowIdxInRowGroupItr.nextLong(); } - // Adjust the row group offset in the `rowIndexWithinRowGroupIterator` iterator. - this.rowIdxInFileItr = new PrimitiveIterator.OfLong() { - public long nextLong() { - return rowGroupRowIdxOffset.get() + rowIdxInRowGroupItr.nextLong(); - } - public boolean hasNext() { - return rowIdxInRowGroupItr.hasNext(); - } + public boolean hasNext() { + return rowIdxInRowGroupItr.hasNext(); + } - public Long next() { - return rowGroupRowIdxOffset.get() + rowIdxInRowGroupItr.next(); - } - }; - } else { - this.rowIdxInFileItr = null; - } + public Long next() { + return rowGroupRowIdxOffset.get() + rowIdxInRowGroupItr.next(); + } + }; } } From 015f967e33ebf45d18c64883816410815c0cd722 Mon Sep 17 00:00:00 2001 From: Prakhar Jain Date: Fri, 18 Mar 2022 23:04:03 -0700 Subject: [PATCH 7/7] fix build --- .../org/apache/parquet/hadoop/InternalParquetRecordReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java index 182f5d8eee..8203e9098d 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordReader.java @@ -291,7 +291,7 @@ public long getCurrentRowIndex() { */ private void resetRowIndexIterator(PageReadStore pages) { Optional rowGroupRowIdxOffset = pages.getRowIndexOffset(); - if (rowGroupRowIdxOffset.isEmpty()) { + if (!rowGroupRowIdxOffset.isPresent()) { this.rowIdxInFileItr = null; return; }