From 6a0fd2f87d85aee37f4b7485494b325043876d7f Mon Sep 17 00:00:00 2001 From: tanjialiang Date: Tue, 31 Dec 2024 14:56:17 +0800 Subject: [PATCH] [parquet] Fix file index result filter the row ranges missing rowgroup offset problem --- .../table/AppendOnlyFileStoreTableTest.java | 29 ++++++++++--------- .../parquet/hadoop/ParquetFileReader.java | 1 + .../columnindex/ColumnIndexFilter.java | 7 ++++- .../filter2/columnindex/RowRanges.java | 13 +++++---- 4 files changed, 30 insertions(+), 20 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java index dd85bf8bcfcf..f2bd0c5ea9c8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java @@ -73,6 +73,7 @@ import java.util.Map; import java.util.Optional; import java.util.Random; +import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.Function; @@ -731,7 +732,7 @@ public void testBSIAndBitmapIndexInDisk() throws Exception { public void testBitmapIndexResultFilterParquetRowRanges() throws Exception { RowType rowType = RowType.builder() - .field("id", DataTypes.INT()) + .field("id", DataTypes.STRING()) .field("event", DataTypes.STRING()) .field("price", DataTypes.INT()) .build(); @@ -749,26 +750,26 @@ public void testBitmapIndexResultFilterParquetRowRanges() throws Exception { + "." + CoreOptions.COLUMNS, "price"); + options.set(ParquetOutputFormat.BLOCK_SIZE, "1048576"); options.set( ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100"); options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, "300"); }); - int bound = 3000; + int bound = 300000; Random random = new Random(); Map expectedMap = new HashMap<>(); - for (int i = 0; i < 5; i++) { - StreamTableWrite write = table.newWrite(commitUser); - StreamTableCommit commit = table.newCommit(commitUser); - for (int j = 0; j < 10000; j++) { - int next = random.nextInt(bound); - expectedMap.compute(next, (key, value) -> value == null ? 1 : value + 1); - write.write(GenericRow.of(1, BinaryString.fromString("A"), next)); - } - commit.commit(i, write.prepareCommit(true, i)); - write.close(); - commit.close(); + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + for (int j = 0; j < 1000000; j++) { + int next = random.nextInt(bound); + BinaryString uuid = BinaryString.fromString(UUID.randomUUID().toString()); + expectedMap.compute(next, (key, value) -> value == null ? 1 : value + 1); + write.write(GenericRow.of(uuid, uuid, next)); } + commit.commit(0, write.prepareCommit(true, 0)); + write.close(); + commit.close(); // test eq for (int i = 0; i < 10; i++) { @@ -789,7 +790,7 @@ public void testBitmapIndexResultFilterParquetRowRanges() throws Exception { // test between for (int i = 0; i < 10; i++) { - int max = random.nextInt(bound); + int max = random.nextInt(bound) + 1; int min = random.nextInt(max); Predicate predicate = PredicateBuilder.and( diff --git a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index e9f757126afa..c0a28cdc9bb9 100644 --- a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -761,6 +761,7 @@ private RowRanges getRowRanges(int blockIndex) { getColumnIndexStore(blockIndex), paths.keySet(), blocks.get(blockIndex).getRowCount(), + blocks.get(blockIndex).getRowIndexOffset(), fileIndexResult); blockRowRanges.set(blockIndex, rowRanges); } diff --git a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java index b2c9365bd646..db21a8961a04 100644 --- a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java +++ b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java @@ -67,6 +67,7 @@ public class ColumnIndexFilter implements Visitor { private final ColumnIndexStore columnIndexStore; private final Set columns; private final long rowCount; + private final long rowIndexOffset; @Nullable private final FileIndexResult fileIndexResult; private RowRanges allRows; @@ -89,6 +90,7 @@ public static RowRanges calculateRowRanges( ColumnIndexStore columnIndexStore, Set paths, long rowCount, + long rowIndexOffset, @Nullable FileIndexResult fileIndexResult) { return filter.accept( new FilterCompat.Visitor() { @@ -102,6 +104,7 @@ public RowRanges visit(FilterPredicateCompat filterPredicateCompat) { columnIndexStore, paths, rowCount, + rowIndexOffset, fileIndexResult)); } catch (MissingOffsetIndexException e) { LOGGER.info(e.getMessage()); @@ -125,10 +128,12 @@ private ColumnIndexFilter( ColumnIndexStore columnIndexStore, Set paths, long rowCount, + long rowIndexOffset, @Nullable FileIndexResult fileIndexResult) { this.columnIndexStore = columnIndexStore; this.columns = paths; this.rowCount = rowCount; + this.rowIndexOffset = rowIndexOffset; this.fileIndexResult = fileIndexResult; } @@ -227,7 +232,7 @@ private RowRanges applyPredicate( return allRows(); } - return RowRanges.create(rowCount, func.apply(ci), oi, fileIndexResult); + return RowRanges.create(rowCount, rowIndexOffset, func.apply(ci), oi, fileIndexResult); } @Override diff --git a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java index 6963814831d4..b199f93958df 100644 --- a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java +++ b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java @@ -41,10 +41,11 @@ * row-group, retrieve the count of the matching rows or check overlapping of a row index range. * *

Note: The class was copied over to support using {@link FileIndexResult} to filter {@link - * RowRanges}. Added a new method {@link RowRanges#create(long, PrimitiveIterator.OfInt, + * RowRanges}. Added a new method {@link RowRanges#create(long, long, PrimitiveIterator.OfInt, * OffsetIndex, FileIndexResult)} * - * @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Set, long, FileIndexResult) + * @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Set, long, long, + * FileIndexResult) */ public class RowRanges { @@ -165,6 +166,7 @@ public static RowRanges create( /** Support using {@link FileIndexResult} to filter the row ranges. */ public static RowRanges create( long rowCount, + long rowIndexOffset, PrimitiveIterator.OfInt pageIndexes, OffsetIndex offsetIndex, @Nullable FileIndexResult fileIndexResult) { @@ -178,13 +180,14 @@ public static RowRanges create( if (fileIndexResult instanceof BitmapIndexResult) { RoaringBitmap32 bitmap = ((BitmapIndexResult) fileIndexResult).get(); RoaringBitmap32 range = - RoaringBitmap32.bitmapOfRange(firstRowIndex, lastRowIndex + 1); + RoaringBitmap32.bitmapOfRange( + rowIndexOffset + firstRowIndex, rowIndexOffset + lastRowIndex + 1); RoaringBitmap32 result = RoaringBitmap32.and(bitmap, range); if (result.isEmpty()) { continue; } - firstRowIndex = result.first(); - lastRowIndex = result.last(); + firstRowIndex = result.first() - rowIndexOffset; + lastRowIndex = result.last() - rowIndexOffset; } ranges.add(new Range(firstRowIndex, lastRowIndex));