From b52ae80f7c8ca6126da1803e03f5831e6a041ad4 Mon Sep 17 00:00:00 2001 From: Ala Luszczak Date: Mon, 20 Jun 2022 12:41:39 +0200 Subject: [PATCH 1/2] PARQUET-2161: Fix row index generation The row indexes introduced in PARQUET-2117 are not computed correctly when: (1) range or offset metadata filter is applied, and (2) the first row group was eliminated by the filter For example, if a file has two row groups with 10 rows each, and we attempt to only read the 2nd row group, we are going to produce row indexes 0, 1, 2, ..., 9 instead of expected 10, 11, ..., 19. This happens because functions `filterFileMetaDataByStart` and `filterFileMetaDataByMidpoint` modify their input `FileMetaData`. To return correct result, `generateRowGroupOffsets` has to be computed before these filters are applied. --- .../converter/ParquetMetadataConverter.java | 8 +++++-- .../filter2/recordlevel/PhoneBookWriter.java | 2 +- .../parquet/hadoop/TestParquetReader.java | 21 +++++++++++++++++-- 3 files changed, 26 insertions(+), 5 deletions(-) diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java index 77856eea53..4409d5f689 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java @@ -1450,15 +1450,19 @@ public FileMetaDataAndRowGroupOffsetInfo visit(SkipMetadataFilter filter) throws @Override public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) throws IOException { FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD); + // We must generate the map *before* filtering because it modifies `fileMetadata`. + Map rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata); FileMetaData filteredFileMetadata = filterFileMetaDataByStart(fileMetadata, filter); - return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, generateRowGroupOffsets(fileMetadata)); + return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, rowGroupToRowIndexOffsetMap); } @Override public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throws IOException { FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD); + // We must generate the map *before* filtering because it modifies `fileMetadata`. + Map rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata); FileMetaData filteredFileMetadata = filterFileMetaDataByMidpoint(fileMetadata, filter); - return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, generateRowGroupOffsets(fileMetadata)); + return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, rowGroupToRowIndexOffsetMap); } }); FileMetaData fileMetaData = fileMetaDataAndRowGroupInfo.fileMetadata; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java index 1e74353e2c..8a8600987f 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java @@ -359,7 +359,7 @@ public static List readUsers(ParquetReader.Builder builder, boolean User u = userFromGroup(group); users.add(u); if (validateRowIndexes) { - assertEquals(reader.getCurrentRowIndex(), u.id); + assertEquals("validating row index", u.id, reader.getCurrentRowIndex()); } } return users; diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java index 86f14a8628..2cb1f54528 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetReader.java @@ -51,6 +51,7 @@ public class TestParquetReader { private static final List DATA = Collections.unmodifiableList(makeUsers(1000)); private final Path file; + private final long fileSize; private static Path createPathFromCP(String path) { try { @@ -60,8 +61,9 @@ private static Path createPathFromCP(String path) { } } - public TestParquetReader(Path file) { + public TestParquetReader(Path file) throws IOException { this.file = file; + this.fileSize = file.getFileSystem(new Configuration()).getFileStatus(file).getLen(); } @Parameterized.Parameters @@ -126,13 +128,19 @@ private static void writePhoneBookToFile(Path file, ParquetProperties.WriterVers } private List readUsers(FilterCompat.Filter filter, boolean useOtherFiltering, boolean useColumnIndexFilter) + throws IOException { + return readUsers(filter, useOtherFiltering, useColumnIndexFilter, 0, this.fileSize); + } + + private List readUsers(FilterCompat.Filter filter, boolean useOtherFiltering, boolean useColumnIndexFilter, long rangeStart, long rangeEnd) throws IOException { return PhoneBookWriter.readUsers(ParquetReader.builder(new GroupReadSupport(), file) .withFilter(filter) .useDictionaryFilter(useOtherFiltering) .useStatsFilter(useOtherFiltering) .useRecordFilter(useOtherFiltering) - .useColumnIndexFilter(useColumnIndexFilter), true); + .useColumnIndexFilter(useColumnIndexFilter) + .withFileRange(rangeStart, rangeEnd), true); } @Test @@ -157,6 +165,15 @@ public void testCurrentRowIndex() throws Exception { assertEquals(reader.getCurrentRowIndex(), -1); } + @Test + public void testRangeFiltering() throws Exception { + // The readUsers also validates the rowIndex for each returned row. + readUsers(FilterCompat.NOOP, false, false, this.fileSize / 2, this.fileSize); + readUsers(FilterCompat.NOOP, true, false, this.fileSize / 3, this.fileSize * 3 / 4); + readUsers(FilterCompat.NOOP, false, true, this.fileSize / 4, this.fileSize / 2); + readUsers(FilterCompat.NOOP, true, true, this.fileSize * 3 / 4, this.fileSize); + } + @Test public void testSimpleFiltering() throws Exception { Set idSet = new HashSet<>(); From a00d1d6613a116c0332a0b5e0446d0904acc3b97 Mon Sep 17 00:00:00 2001 From: Ala Luszczak Date: Wed, 29 Jun 2022 17:46:27 +0200 Subject: [PATCH 2/2] Adjust assert message --- .../org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java index 8a8600987f..f68dce4256 100644 --- a/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java +++ b/parquet-hadoop/src/test/java/org/apache/parquet/filter2/recordlevel/PhoneBookWriter.java @@ -359,7 +359,7 @@ public static List readUsers(ParquetReader.Builder builder, boolean User u = userFromGroup(group); users.add(u); if (validateRowIndexes) { - assertEquals("validating row index", u.id, reader.getCurrentRowIndex()); + assertEquals("Row index should be equal to User id", u.id, reader.getCurrentRowIndex()); } } return users;