From 3ba1d4b30c0d262d59df083b0e56cb05b2b6d0e1 Mon Sep 17 00:00:00 2001 From: tanjialiang Date: Thu, 2 Jan 2025 11:08:26 +0800 Subject: [PATCH 01/17] [parquet] Support deletion vector result pushdown --- .../paimon/format/FormatReaderContext.java | 17 +- .../paimon/format/FormatReaderFactory.java | 4 + .../deletionvectors/BitmapDeletionVector.java | 4 + .../paimon/operation/RawFileSplitRead.java | 22 ++- .../table/PrimaryKeyFileStoreTableTest.java | 175 ++++++++++-------- ...CompactedChangelogFormatReaderFactory.java | 6 + .../format/parquet/ParquetReaderFactory.java | 3 +- .../paimon/format/parquet/ParquetUtil.java | 9 +- .../parquet/hadoop/ParquetFileReader.java | 22 ++- .../columnindex/ColumnIndexFilter.java | 19 +- .../filter2/columnindex/RowRanges.java | 30 ++- .../parquet/ParquetFormatReadWriteTest.java | 2 +- 12 files changed, 211 insertions(+), 102 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java index cae6a977e615..3858552a6712 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java @@ -22,6 +22,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.utils.RoaringBitmap32; import javax.annotation.Nullable; @@ -32,17 +33,23 @@ public class FormatReaderContext implements FormatReaderFactory.Context { private final Path file; private final long fileSize; @Nullable private final FileIndexResult fileIndexResult; + @Nullable private final RoaringBitmap32 deletion; public FormatReaderContext(FileIO fileIO, Path file, long fileSize) { - this(fileIO, file, fileSize, null); + this(fileIO, file, fileSize, null, null); } public FormatReaderContext( - FileIO fileIO, Path file, long fileSize, @Nullable FileIndexResult fileIndexResult) { + FileIO fileIO, + Path file, + long fileSize, + @Nullable FileIndexResult fileIndexResult, + @Nullable RoaringBitmap32 deletion) { this.fileIO = fileIO; this.file = file; this.fileSize = fileSize; this.fileIndexResult = fileIndexResult; + this.deletion = deletion; } @Override @@ -65,4 +72,10 @@ public long fileSize() { public FileIndexResult fileIndex() { return fileIndexResult; } + + @Nullable + @Override + public RoaringBitmap32 deletion() { + return deletion; + } } diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java index 5ef084ec4d34..2c6470af9e8c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java @@ -24,6 +24,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.reader.FileRecordReader; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.utils.RoaringBitmap32; import javax.annotation.Nullable; @@ -45,5 +46,8 @@ interface Context { @Nullable FileIndexResult fileIndex(); + + @Nullable + RoaringBitmap32 deletion(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java index 55e0a975e303..40a509c54fbb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java @@ -93,6 +93,10 @@ public byte[] serializeToBytes() { } } + public RoaringBitmap32 get() { + return roaringBitmap.clone(); + } + public static DeletionVector deserializeFromByteBuffer(ByteBuffer buffer) throws IOException { RoaringBitmap32 bitmap = new RoaringBitmap32(); bitmap.deserialize(buffer); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index d0f3275b5afc..1af0a88a0c80 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader; +import org.apache.paimon.deletionvectors.BitmapDeletionVector; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.disk.IOManager; import org.apache.paimon.fileindex.FileIndexResult; @@ -50,6 +51,7 @@ import org.apache.paimon.utils.FormatReaderMapping; import org.apache.paimon.utils.FormatReaderMapping.Builder; import org.apache.paimon.utils.IOExceptionSupplier; +import org.apache.paimon.utils.RoaringBitmap32; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -208,9 +210,26 @@ private FileRecordReader createFileReader( } } + RoaringBitmap32 deletion = null; + DeletionVector deletionVector = dvFactory == null ? null : dvFactory.get(); + if (deletionVector instanceof BitmapDeletionVector) { + deletion = ((BitmapDeletionVector) deletionVector).get(); + } + + if (deletion != null && fileIndexResult instanceof BitmapIndexResult) { + RoaringBitmap32 selection = ((BitmapIndexResult) fileIndexResult).get(); + if (RoaringBitmap32.andNot(selection, deletion).isEmpty()) { + return new EmptyFileRecordReader<>(); + } + } + FormatReaderContext formatReaderContext = new FormatReaderContext( - fileIO, dataFilePathFactory.toPath(file), file.fileSize(), fileIndexResult); + fileIO, + dataFilePathFactory.toPath(file), + file.fileSize(), + fileIndexResult, + deletion); FileRecordReader fileRecordReader = new DataFileRecordReader( formatReaderMapping.getReaderFactory(), @@ -225,7 +244,6 @@ private FileRecordReader createFileReader( fileRecordReader, (BitmapIndexResult) fileIndexResult); } - DeletionVector deletionVector = dvFactory == null ? null : dvFactory.get(); if (deletionVector != null && !deletionVector.isEmpty()) { return new ApplyDeletionVectorReader(fileRecordReader, deletionVector); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index fa635e2ab666..46068bf3e276 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -31,11 +31,13 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.BundleRecords; import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.SchemaUtils; @@ -58,6 +60,7 @@ import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.StreamTableScan; import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.table.source.TableScan; import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.table.system.AuditLogTable; import org.apache.paimon.table.system.FileMonitorTable; @@ -68,6 +71,7 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Pair; +import org.apache.parquet.hadoop.ParquetOutputFormat; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -85,8 +89,10 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -100,7 +106,7 @@ import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP; import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED; import static org.apache.paimon.CoreOptions.FILE_FORMAT; -import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD; +import static org.apache.paimon.CoreOptions.FILE_FORMAT_PARQUET; import static org.apache.paimon.CoreOptions.LOOKUP_LOCAL_FILE_TYPE; import static org.apache.paimon.CoreOptions.MERGE_ENGINE; import static org.apache.paimon.CoreOptions.MergeEngine; @@ -964,89 +970,110 @@ public void testDeletionVectorsWithFileIndexInMeta() throws Exception { } @Test - public void testDeletionVectorsWithBitmapFileIndexInFile() throws Exception { + public void testDeletionVectorsCombineWithFileIndexPushDownParquet() throws Exception { FileStoreTable table = createFileStoreTable( conf -> { conf.set(BUCKET, 1); + conf.set(FILE_FORMAT, FILE_FORMAT_PARQUET); conf.set(DELETION_VECTORS_ENABLED, true); - conf.set(TARGET_FILE_SIZE, MemorySize.ofBytes(1)); - conf.set(FILE_INDEX_IN_MANIFEST_THRESHOLD, MemorySize.ofBytes(1)); + conf.set(ParquetOutputFormat.BLOCK_SIZE, "1048576"); + conf.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100"); + conf.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, "300"); conf.set("file-index.bitmap.columns", "b"); }); - StreamTableWrite write = - table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString())); - StreamTableCommit commit = table.newCommit(commitUser); - - write.write(rowData(1, 1, 300L)); - write.write(rowData(1, 2, 400L)); - write.write(rowData(1, 3, 100L)); - write.write(rowData(1, 4, 100L)); - commit.commit(0, write.prepareCommit(true, 0)); - - write.write(rowData(1, 1, 100L)); - write.write(rowData(1, 2, 100L)); - write.write(rowData(1, 3, 300L)); - write.write(rowData(1, 5, 100L)); - commit.commit(1, write.prepareCommit(true, 1)); - - write.write(rowData(1, 4, 200L)); - commit.commit(2, write.prepareCommit(true, 2)); - - PredicateBuilder builder = new PredicateBuilder(ROW_TYPE); - List splits = toSplits(table.newSnapshotReader().read().dataSplits()); - assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2); - TableRead read = table.newRead().withFilter(builder.equal(2, 100L)); - assertThat(getResult(read, splits, BATCH_ROW_TO_STRING)) - .hasSameElementsAs( - Arrays.asList( - "1|1|100|binary|varbinary|mapKey:mapVal|multiset", - "1|2|100|binary|varbinary|mapKey:mapVal|multiset", - "1|5|100|binary|varbinary|mapKey:mapVal|multiset")); - } - - @Test - public void testDeletionVectorsWithBitmapFileIndexInMeta() throws Exception { - FileStoreTable table = - createFileStoreTable( - conf -> { - conf.set(BUCKET, 1); - conf.set(DELETION_VECTORS_ENABLED, true); - conf.set(TARGET_FILE_SIZE, MemorySize.ofBytes(1)); - conf.set(FILE_INDEX_IN_MANIFEST_THRESHOLD, MemorySize.ofMebiBytes(1)); - conf.set("file-index.bitmap.columns", "b"); - }); + BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder(); + BatchTableWrite batchWrite; + BatchTableCommit batchCommit; - StreamTableWrite write = - table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString())); - StreamTableCommit commit = table.newCommit(commitUser); - - write.write(rowData(1, 1, 300L)); - write.write(rowData(1, 2, 400L)); - write.write(rowData(1, 3, 100L)); - write.write(rowData(1, 4, 100L)); - commit.commit(0, write.prepareCommit(true, 0)); - - write.write(rowData(1, 1, 100L)); - write.write(rowData(1, 2, 100L)); - write.write(rowData(1, 3, 300L)); - write.write(rowData(1, 5, 100L)); - commit.commit(1, write.prepareCommit(true, 1)); - - write.write(rowData(1, 4, 200L)); - commit.commit(2, write.prepareCommit(true, 2)); + // test bitmap index and deletion vector merge and using EmptyFileRecordReader to read + batchWrite = + (BatchTableWrite) + batchWriteBuilder + .newWrite() + .withIOManager(new IOManagerImpl(tempDir.toString())); + batchCommit = batchWriteBuilder.newCommit(); + batchWrite.write(rowData(1, 1, 300L)); + batchWrite.write(rowData(1, 2, 400L)); + batchWrite.write(rowData(1, 3, 100L)); + batchWrite.write(rowData(1, 4, 100L)); + batchCommit.commit(batchWrite.prepareCommit()); + batchWrite.close(); + batchCommit.close(); + + batchWrite = + (BatchTableWrite) + batchWriteBuilder + .newWrite() + .withIOManager(new IOManagerImpl(tempDir.toString())); + batchCommit = batchWriteBuilder.newCommit(); + batchWrite.write(rowDataWithKind(RowKind.DELETE, 1, 1, 300L)); + batchCommit.commit(batchWrite.prepareCommit()); + batchWrite.close(); + batchCommit.close(); + + Predicate predicate = new PredicateBuilder(table.rowType()).equal(2, 300L); + TableScan.Plan plan = table.newScan().plan(); + RecordReader reader = + table.newRead().withFilter(predicate).createReader(plan.splits()); + AtomicInteger cnt = new AtomicInteger(0); + reader.forEachRemaining(row -> cnt.incrementAndGet()); + assertThat(cnt.get()).isEqualTo(0); + reader.close(); + + // truncate table + FileStoreCommit truncateCommit = table.store().newCommit(UUID.randomUUID().toString()); + truncateCommit.truncateTable(2); + truncateCommit.close(); + + // test parquet row ranges filtering + int bound = 100; + Random random = new Random(); + Map expectedMap = new HashMap<>(); + batchWrite = + (BatchTableWrite) + batchWriteBuilder + .newWrite() + .withIOManager(new IOManagerImpl(tempDir.toString())); + batchCommit = batchWriteBuilder.newCommit(); + for (int i = 0; i < 10000; i++) { + long next = random.nextInt(bound); + expectedMap.put(i, next); + batchWrite.write(rowData(1, i, next)); + } + batchCommit.commit(batchWrite.prepareCommit()); + batchWrite.close(); + batchCommit.close(); - PredicateBuilder builder = new PredicateBuilder(ROW_TYPE); - List splits = toSplits(table.newSnapshotReader().read().dataSplits()); - assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2); - TableRead read = table.newRead().withFilter(builder.equal(2, 100L)); - assertThat(getResult(read, splits, BATCH_ROW_TO_STRING)) - .hasSameElementsAs( - Arrays.asList( - "1|1|100|binary|varbinary|mapKey:mapVal|multiset", - "1|2|100|binary|varbinary|mapKey:mapVal|multiset", - "1|5|100|binary|varbinary|mapKey:mapVal|multiset")); + batchWrite = + (BatchTableWrite) + batchWriteBuilder + .newWrite() + .withIOManager(new IOManagerImpl(tempDir.toString())); + batchCommit = batchWriteBuilder.newCommit(); + for (int i = 2500; i < 5000; i++) { + batchWrite.write(rowDataWithKind(RowKind.DELETE, 1, i, expectedMap.remove(i))); + } + batchCommit.commit(batchWrite.prepareCommit()); + batchWrite.close(); + batchCommit.close(); + + for (int i = 0; i < bound; i++) { + long next = i; + predicate = new PredicateBuilder(table.rowType()).equal(2, next); + plan = table.newScan().plan(); + reader = table.newRead().withFilter(predicate).createReader(plan.splits()); + AtomicLong expectedCnt = new AtomicLong(0); + reader.forEachRemaining(row -> { + expectedCnt.incrementAndGet(); + assertThat(row.getLong(2)).isEqualTo(expectedMap.get(row.getInt(1))); + }); + long count = + expectedMap.entrySet().stream().filter(x -> x.getValue().equals(next)).count(); + assertThat(expectedCnt.get()).isEqualTo(count); + reader.close(); + } } @Test diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java index e0aed448db93..9a5753311aef 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java @@ -28,6 +28,7 @@ import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.fs.SeekableInputStream; import org.apache.paimon.reader.FileRecordReader; +import org.apache.paimon.utils.RoaringBitmap32; import java.io.EOFException; import java.io.IOException; @@ -86,6 +87,11 @@ public long fileSize() { public FileIndexResult fileIndex() { return context.fileIndex(); } + + @Override + public RoaringBitmap32 deletion() { + return context.deletion(); + } }); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index 9fef7563718c..9b173fe13a44 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -120,7 +120,8 @@ public FileRecordReader createReader(FormatReaderFactory.Context co new ParquetFileReader( ParquetInputFile.fromPath(context.fileIO(), context.filePath()), builder.build(), - context.fileIndex()); + context.fileIndex(), + context.deletion()); MessageType fileSchema = reader.getFileMetaData().getSchema(); MessageType requestedSchema = clipParquetSchema(fileSchema); reader.setRequestedSchema(requestedSchema); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java index 82d19e448878..2d30051cde3d 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java @@ -18,7 +18,6 @@ package org.apache.paimon.format.parquet; -import org.apache.paimon.fileindex.FileIndexResult; import org.apache.paimon.format.SimpleStatsExtractor; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; @@ -49,7 +48,7 @@ public class ParquetUtil { */ public static Pair>, SimpleStatsExtractor.FileInfo> extractColumnStats(FileIO fileIO, Path path) throws IOException { - try (ParquetFileReader reader = getParquetReader(fileIO, path, null)) { + try (ParquetFileReader reader = getParquetReader(fileIO, path)) { ParquetMetadata parquetMetadata = reader.getFooter(); List blockMetaDataList = parquetMetadata.getBlocks(); Map> resultStats = new HashMap<>(); @@ -78,12 +77,12 @@ public class ParquetUtil { * @param path the path of parquet files to be read * @return parquet reader, used for reading footer, status, etc. */ - public static ParquetFileReader getParquetReader( - FileIO fileIO, Path path, FileIndexResult fileIndexResult) throws IOException { + public static ParquetFileReader getParquetReader(FileIO fileIO, Path path) throws IOException { return new ParquetFileReader( ParquetInputFile.fromPath(fileIO, path), ParquetReadOptions.builder().build(), - fileIndexResult); + null, + null); } static void assertStatsClass( diff --git a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index c0a28cdc9bb9..8e26460f54d7 100644 --- a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -80,6 +80,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.Closeable; import java.io.IOException; import java.io.InputStream; @@ -220,6 +222,8 @@ private static ParquetMetadata readFooter( private final List blockIndexStores; private final List blockRowRanges; private final boolean blocksFiltered; + @Nullable private final FileIndexResult fileIndexResult; + @Nullable private final RoaringBitmap32 deletion; // not final. in some cases, this may be lazily loaded for backward-compat. private ParquetMetadata footer; @@ -228,15 +232,18 @@ private static ParquetMetadata readFooter( private ColumnChunkPageReadStore currentRowGroup = null; private DictionaryPageReader nextDictionaryReader = null; - private InternalFileDecryptor fileDecryptor = null; - private FileIndexResult fileIndexResult; + private InternalFileDecryptor fileDecryptor; public ParquetFileReader( - InputFile file, ParquetReadOptions options, FileIndexResult fileIndexResult) + InputFile file, + ParquetReadOptions options, + @Nullable FileIndexResult fileIndexResult, + @Nullable RoaringBitmap32 deletion) throws IOException { this.converter = new ParquetMetadataConverter(options); this.file = (ParquetInputFile) file; this.fileIndexResult = fileIndexResult; + this.deletion = deletion; this.f = this.file.newStream(); this.options = options; try { @@ -360,13 +367,15 @@ private List filterRowGroups(List blocks) throws I blocks = RowGroupFilter.filterRowGroups(levels, recordFilter, blocks, this); } if (fileIndexResult instanceof BitmapIndexResult) { - RoaringBitmap32 bitmap = ((BitmapIndexResult) fileIndexResult).get(); + RoaringBitmap32 selection = ((BitmapIndexResult) fileIndexResult).get(); + RoaringBitmap32 result = + deletion == null ? selection : RoaringBitmap32.andNot(selection, deletion); blocks = blocks.stream() .filter( it -> { long rowIndexOffset = it.getRowIndexOffset(); - return bitmap.rangeCardinality( + return result.rangeCardinality( rowIndexOffset, rowIndexOffset + it.getRowCount()) > 0; @@ -762,7 +771,8 @@ private RowRanges getRowRanges(int blockIndex) { paths.keySet(), blocks.get(blockIndex).getRowCount(), blocks.get(blockIndex).getRowIndexOffset(), - fileIndexResult); + fileIndexResult, + deletion); blockRowRanges.set(blockIndex, rowRanges); } return rowRanges; diff --git a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java index db21a8961a04..02e9ef1f3413 100644 --- a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java +++ b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java @@ -19,6 +19,7 @@ package org.apache.parquet.internal.filter2.columnindex; import org.apache.paimon.fileindex.FileIndexResult; +import org.apache.paimon.utils.RoaringBitmap32; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.FilterCompat.FilterPredicateCompat; @@ -69,6 +70,7 @@ public class ColumnIndexFilter implements Visitor { private final long rowCount; private final long rowIndexOffset; @Nullable private final FileIndexResult fileIndexResult; + @Nullable private final RoaringBitmap32 deletion; private RowRanges allRows; /** @@ -80,8 +82,10 @@ public class ColumnIndexFilter implements Visitor { * @param paths the paths of the columns used in the actual projection; a column not being part * of the projection will be handled as containing {@code null} values only even if the * column has values written in the file - * @param fileIndexResult the file index result; it will use to filter row ranges * @param rowCount the total number of rows in the row-group + * @param rowIndexOffset the offset of the row-group + * @param fileIndexResult the file index result; it will use to filter or narrow the row ranges + * @param deletion the deletion vector result; it will use to filter or narrow the row ranges * @return the ranges of the possible matching row indexes; the returned ranges will contain all * the rows if any of the required offset index is missing */ @@ -91,7 +95,8 @@ public static RowRanges calculateRowRanges( Set paths, long rowCount, long rowIndexOffset, - @Nullable FileIndexResult fileIndexResult) { + @Nullable FileIndexResult fileIndexResult, + @Nullable RoaringBitmap32 deletion) { return filter.accept( new FilterCompat.Visitor() { @Override @@ -105,7 +110,8 @@ public RowRanges visit(FilterPredicateCompat filterPredicateCompat) { paths, rowCount, rowIndexOffset, - fileIndexResult)); + fileIndexResult, + deletion)); } catch (MissingOffsetIndexException e) { LOGGER.info(e.getMessage()); return RowRanges.createSingle(rowCount); @@ -129,12 +135,14 @@ private ColumnIndexFilter( Set paths, long rowCount, long rowIndexOffset, - @Nullable FileIndexResult fileIndexResult) { + @Nullable FileIndexResult fileIndexResult, + @Nullable RoaringBitmap32 deletion) { this.columnIndexStore = columnIndexStore; this.columns = paths; this.rowCount = rowCount; this.rowIndexOffset = rowIndexOffset; this.fileIndexResult = fileIndexResult; + this.deletion = deletion; } private RowRanges allRows() { @@ -232,7 +240,8 @@ private RowRanges applyPredicate( return allRows(); } - return RowRanges.create(rowCount, rowIndexOffset, func.apply(ci), oi, fileIndexResult); + return RowRanges.create( + rowCount, rowIndexOffset, func.apply(ci), oi, fileIndexResult, deletion); } @Override diff --git a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java index b199f93958df..715dac5f364e 100644 --- a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java +++ b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java @@ -40,12 +40,13 @@ * column index based filtering. To be used iterate over the matching row indexes to be read from a * row-group, retrieve the count of the matching rows or check overlapping of a row index range. * - *

Note: The class was copied over to support using {@link FileIndexResult} to filter {@link - * RowRanges}. Added a new method {@link RowRanges#create(long, long, PrimitiveIterator.OfInt, - * OffsetIndex, FileIndexResult)} + *

Note: The class was copied over to support using {@link FileIndexResult} and deletion vector + * result to filter or narrow the {@link RowRanges}. Added a new method {@link + * RowRanges#create(long, long, PrimitiveIterator.OfInt, OffsetIndex, FileIndexResult, + * RoaringBitmap32)} * * @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Set, long, long, - * FileIndexResult) + * FileIndexResult, RoaringBitmap32) */ public class RowRanges { @@ -163,13 +164,17 @@ public static RowRanges create( return ranges; } - /** Support using {@link FileIndexResult} to filter the row ranges. */ + /** + * Support using the {@link FileIndexResult} and the deletion vector result to filter the row + * ranges. + */ public static RowRanges create( long rowCount, long rowIndexOffset, PrimitiveIterator.OfInt pageIndexes, OffsetIndex offsetIndex, - @Nullable FileIndexResult fileIndexResult) { + @Nullable FileIndexResult fileIndexResult, + @Nullable RoaringBitmap32 deletion) { RowRanges ranges = new RowRanges(); while (pageIndexes.hasNext()) { int pageIndex = pageIndexes.nextInt(); @@ -190,6 +195,19 @@ public static RowRanges create( lastRowIndex = result.last() - rowIndexOffset; } + // using deletion vector result to filter or narrow the row ranges + if (deletion != null) { + RoaringBitmap32 range = + RoaringBitmap32.bitmapOfRange( + rowIndexOffset + firstRowIndex, rowIndexOffset + lastRowIndex + 1); + RoaringBitmap32 result = RoaringBitmap32.andNot(range, deletion); + if (result.isEmpty()) { + continue; + } + firstRowIndex = result.first() - rowIndexOffset; + lastRowIndex = result.last() - rowIndexOffset; + } + ranges.add(new Range(firstRowIndex, lastRowIndex)); } return ranges; diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java index e0d1d240a9fd..221d524fff5c 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java @@ -75,7 +75,7 @@ public void testEnableBloomFilter(boolean enabled) throws Exception { writer.close(); out.close(); - try (ParquetFileReader reader = ParquetUtil.getParquetReader(fileIO, file, null)) { + try (ParquetFileReader reader = ParquetUtil.getParquetReader(fileIO, file)) { ParquetMetadata parquetMetadata = reader.getFooter(); List blockMetaDataList = parquetMetadata.getBlocks(); for (BlockMetaData blockMetaData : blockMetaDataList) { From 35a979879f09e090b1ed77e94781c153a5dd77ab Mon Sep 17 00:00:00 2001 From: tanjialiang Date: Thu, 2 Jan 2025 11:09:54 +0800 Subject: [PATCH 02/17] mvn spotless:apply --- .../paimon/table/PrimaryKeyFileStoreTableTest.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index 46068bf3e276..ae2ca611520b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -1065,10 +1065,11 @@ public void testDeletionVectorsCombineWithFileIndexPushDownParquet() throws Exce plan = table.newScan().plan(); reader = table.newRead().withFilter(predicate).createReader(plan.splits()); AtomicLong expectedCnt = new AtomicLong(0); - reader.forEachRemaining(row -> { - expectedCnt.incrementAndGet(); - assertThat(row.getLong(2)).isEqualTo(expectedMap.get(row.getInt(1))); - }); + reader.forEachRemaining( + row -> { + expectedCnt.incrementAndGet(); + assertThat(row.getLong(2)).isEqualTo(expectedMap.get(row.getInt(1))); + }); long count = expectedMap.entrySet().stream().filter(x -> x.getValue().equals(next)).count(); assertThat(expectedCnt.get()).isEqualTo(count); From 305d1e79399d3522facbaf802b4dc1f2262f6e46 Mon Sep 17 00:00:00 2001 From: tanjialiang Date: Thu, 2 Jan 2025 13:42:11 +0800 Subject: [PATCH 03/17] deletion vector result support filter row group --- .../table/PrimaryKeyFileStoreTableTest.java | 47 +++++++++---------- .../parquet/hadoop/ParquetFileReader.java | 16 +++++++ 2 files changed, 37 insertions(+), 26 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index ae2ca611520b..caaf9d8508dc 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -869,11 +869,14 @@ public void testDeletionVectorsWithParquetFilter() throws Exception { writeBuilder .newWrite() .withIOManager(new IOManagerImpl(tempDir.toString())); - for (int i = 110000; i < 115000; i++) { + + // test row ranges filter + for (int i = 1000; i < 6000; i++) { write.write(rowDataWithKind(RowKind.DELETE, 1, i, i * 100L)); } - for (int i = 130000; i < 135000; i++) { + // test deletion vector filter row group + for (int i = 93421; i < 187795; i++) { write.write(rowDataWithKind(RowKind.DELETE, 1, i, i * 100L)); } @@ -886,9 +889,8 @@ public void testDeletionVectorsWithParquetFilter() throws Exception { Random random = new Random(); // point filter - for (int i = 0; i < 10; i++) { - int value = random.nextInt(110000); + int value = 6000 + random.nextInt(93421 - 6000); TableRead read = table.newRead().withFilter(builder.equal(1, value)).executeFilter(); assertThat(getResult(read, splits, BATCH_ROW_TO_STRING)) .isEqualTo( @@ -899,7 +901,7 @@ public void testDeletionVectorsWithParquetFilter() throws Exception { } for (int i = 0; i < 10; i++) { - int value = 130000 + random.nextInt(5000); + int value = 93421 + random.nextInt(187795 - 93421); TableRead read = table.newRead().withFilter(builder.equal(1, value)).executeFilter(); assertThat(getResult(read, splits, BATCH_ROW_TO_STRING)).isEmpty(); } @@ -908,29 +910,22 @@ public void testDeletionVectorsWithParquetFilter() throws Exception { table.newRead() .withFilter( PredicateBuilder.and( - builder.greaterOrEqual(1, 100000), - builder.lessThan(1, 150000))) + builder.greaterOrEqual(1, 90000), + builder.lessThan(1, 200000))) .executeFilter(); List result = getResult(tableRead, splits, BATCH_ROW_TO_STRING); - assertThat(result.size()).isEqualTo(40000); // filter 10000 + assertThat(result.size()).isEqualTo((200000 - 90000) - (187795 - 93421)); - assertThat(result) - .doesNotContain("1|110000|11000000|binary|varbinary|mapKey:mapVal|multiset"); - assertThat(result) - .doesNotContain("1|114999|11499900|binary|varbinary|mapKey:mapVal|multiset"); - assertThat(result) - .doesNotContain("1|130000|13000000|binary|varbinary|mapKey:mapVal|multiset"); - assertThat(result) - .doesNotContain("1|134999|13499900|binary|varbinary|mapKey:mapVal|multiset"); - assertThat(result).contains("1|100000|10000000|binary|varbinary|mapKey:mapVal|multiset"); - assertThat(result).contains("1|149999|14999900|binary|varbinary|mapKey:mapVal|multiset"); - - assertThat(result).contains("1|101099|10109900|binary|varbinary|mapKey:mapVal|multiset"); - assertThat(result).contains("1|115000|11500000|binary|varbinary|mapKey:mapVal|multiset"); - assertThat(result).contains("1|129999|12999900|binary|varbinary|mapKey:mapVal|multiset"); - assertThat(result).contains("1|135000|13500000|binary|varbinary|mapKey:mapVal|multiset"); + assertThat(result).doesNotContain("1|93421|9342100|binary|varbinary|mapKey:mapVal|multiset"); + assertThat(result).doesNotContain("1|187794|18779400|binary|varbinary|mapKey:mapVal|multiset"); + assertThat(result).doesNotContain("1|200000|20000000|binary|varbinary|mapKey:mapVal|multiset"); + + assertThat(result).contains("1|199999|19999900|binary|varbinary|mapKey:mapVal|multiset"); + assertThat(result).contains("1|90000|9000000|binary|varbinary|mapKey:mapVal|multiset"); + assertThat(result).contains("1|187795|18779500|binary|varbinary|mapKey:mapVal|multiset"); + assertThat(result).contains("1|93420|9342000|binary|varbinary|mapKey:mapVal|multiset"); } @Test @@ -977,7 +972,7 @@ public void testDeletionVectorsCombineWithFileIndexPushDownParquet() throws Exce conf.set(BUCKET, 1); conf.set(FILE_FORMAT, FILE_FORMAT_PARQUET); conf.set(DELETION_VECTORS_ENABLED, true); - conf.set(ParquetOutputFormat.BLOCK_SIZE, "1048576"); + conf.set(ParquetOutputFormat.BLOCK_SIZE, "524288"); conf.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100"); conf.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, "300"); conf.set("file-index.bitmap.columns", "b"); @@ -1037,7 +1032,7 @@ public void testDeletionVectorsCombineWithFileIndexPushDownParquet() throws Exce .newWrite() .withIOManager(new IOManagerImpl(tempDir.toString())); batchCommit = batchWriteBuilder.newCommit(); - for (int i = 0; i < 10000; i++) { + for (int i = 0; i < 100000; i++) { long next = random.nextInt(bound); expectedMap.put(i, next); batchWrite.write(rowData(1, i, next)); @@ -1052,7 +1047,7 @@ public void testDeletionVectorsCombineWithFileIndexPushDownParquet() throws Exce .newWrite() .withIOManager(new IOManagerImpl(tempDir.toString())); batchCommit = batchWriteBuilder.newCommit(); - for (int i = 2500; i < 5000; i++) { + for (int i = 25000; i < 50000; i++) { batchWrite.write(rowDataWithKind(RowKind.DELETE, 1, i, expectedMap.remove(i))); } batchCommit.commit(batchWrite.prepareCommit()); diff --git a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 8e26460f54d7..192e8f064392 100644 --- a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -366,6 +366,7 @@ private List filterRowGroups(List blocks) throws I } blocks = RowGroupFilter.filterRowGroups(levels, recordFilter, blocks, this); } + if (fileIndexResult instanceof BitmapIndexResult) { RoaringBitmap32 selection = ((BitmapIndexResult) fileIndexResult).get(); RoaringBitmap32 result = @@ -382,6 +383,21 @@ private List filterRowGroups(List blocks) throws I }) .collect(Collectors.toList()); } + + if (deletion != null) { + blocks = + blocks.stream() + .filter( + it -> { + long rowIndexOffset = it.getRowIndexOffset(); + RoaringBitmap32 range = + RoaringBitmap32.bitmapOfRange( + rowIndexOffset, + rowIndexOffset + it.getRowCount()); + return !RoaringBitmap32.andNot(range, deletion).isEmpty(); + }) + .collect(Collectors.toList()); + } } return blocks; From 985a58a99736d5aa6653883e1766a853c894d77c Mon Sep 17 00:00:00 2001 From: tanjialiang Date: Thu, 2 Jan 2025 13:43:35 +0800 Subject: [PATCH 04/17] mvn spotless:apply --- .../paimon/table/PrimaryKeyFileStoreTableTest.java | 9 ++++++--- .../org/apache/parquet/hadoop/ParquetFileReader.java | 3 ++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index caaf9d8508dc..dc9954367ab1 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -918,9 +918,12 @@ public void testDeletionVectorsWithParquetFilter() throws Exception { assertThat(result.size()).isEqualTo((200000 - 90000) - (187795 - 93421)); - assertThat(result).doesNotContain("1|93421|9342100|binary|varbinary|mapKey:mapVal|multiset"); - assertThat(result).doesNotContain("1|187794|18779400|binary|varbinary|mapKey:mapVal|multiset"); - assertThat(result).doesNotContain("1|200000|20000000|binary|varbinary|mapKey:mapVal|multiset"); + assertThat(result) + .doesNotContain("1|93421|9342100|binary|varbinary|mapKey:mapVal|multiset"); + assertThat(result) + .doesNotContain("1|187794|18779400|binary|varbinary|mapKey:mapVal|multiset"); + assertThat(result) + .doesNotContain("1|200000|20000000|binary|varbinary|mapKey:mapVal|multiset"); assertThat(result).contains("1|199999|19999900|binary|varbinary|mapKey:mapVal|multiset"); assertThat(result).contains("1|90000|9000000|binary|varbinary|mapKey:mapVal|multiset"); diff --git a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 192e8f064392..3854b3f43989 100644 --- a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -394,7 +394,8 @@ private List filterRowGroups(List blocks) throws I RoaringBitmap32.bitmapOfRange( rowIndexOffset, rowIndexOffset + it.getRowCount()); - return !RoaringBitmap32.andNot(range, deletion).isEmpty(); + return !RoaringBitmap32.andNot(range, deletion) + .isEmpty(); }) .collect(Collectors.toList()); } From 85f83f942c631759eb54b1466eecc50f370a1e10 Mon Sep 17 00:00:00 2001 From: tanjialiang Date: Fri, 3 Jan 2025 17:38:45 +0800 Subject: [PATCH 05/17] fix --- .../parquet/hadoop/ParquetFileReader.java | 39 +++++++++------ .../columnindex/ColumnIndexFilter.java | 46 +++++++++-------- .../filter2/columnindex/RowRanges.java | 49 ++++++++++--------- 3 files changed, 72 insertions(+), 62 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index 3854b3f43989..e8f7e500c987 100644 --- a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -222,7 +222,7 @@ private static ParquetMetadata readFooter( private final List blockIndexStores; private final List blockRowRanges; private final boolean blocksFiltered; - @Nullable private final FileIndexResult fileIndexResult; + @Nullable private final RoaringBitmap32 selection; @Nullable private final RoaringBitmap32 deletion; // not final. in some cases, this may be lazily loaded for backward-compat. @@ -242,8 +242,6 @@ public ParquetFileReader( throws IOException { this.converter = new ParquetMetadataConverter(options); this.file = (ParquetInputFile) file; - this.fileIndexResult = fileIndexResult; - this.deletion = deletion; this.f = this.file.newStream(); this.options = options; try { @@ -261,6 +259,16 @@ public ParquetFileReader( this.fileDecryptor = null; // Plaintext file. No need in decryptor } + RoaringBitmap32 selection = null; + if (fileIndexResult instanceof BitmapIndexResult) { + selection = ((BitmapIndexResult) fileIndexResult).get(); + } + if (selection != null && deletion != null) { + selection = RoaringBitmap32.andNot(selection, deletion); + } + this.selection = selection; + this.deletion = deletion; + try { this.blocks = filterRowGroups(footer.getBlocks()); this.blocksFiltered = this.blocks.size() != footer.getBlocks().size(); @@ -367,24 +375,21 @@ private List filterRowGroups(List blocks) throws I blocks = RowGroupFilter.filterRowGroups(levels, recordFilter, blocks, this); } - if (fileIndexResult instanceof BitmapIndexResult) { - RoaringBitmap32 selection = ((BitmapIndexResult) fileIndexResult).get(); - RoaringBitmap32 result = - deletion == null ? selection : RoaringBitmap32.andNot(selection, deletion); + int size = blocks.size(); + if (selection != null) { blocks = blocks.stream() .filter( it -> { long rowIndexOffset = it.getRowIndexOffset(); - return result.rangeCardinality( + RoaringBitmap32 range = + RoaringBitmap32.bitmapOfRange( rowIndexOffset, - rowIndexOffset + it.getRowCount()) - > 0; + rowIndexOffset + it.getRowCount()); + return RoaringBitmap32.intersects(selection, range); }) .collect(Collectors.toList()); - } - - if (deletion != null) { + } else if (deletion != null) { blocks = blocks.stream() .filter( @@ -394,11 +399,13 @@ private List filterRowGroups(List blocks) throws I RoaringBitmap32.bitmapOfRange( rowIndexOffset, rowIndexOffset + it.getRowCount()); - return !RoaringBitmap32.andNot(range, deletion) - .isEmpty(); + return !deletion.contains(range); }) .collect(Collectors.toList()); } + if (size > blocks.size()) { + System.out.println("filter row group: " + (size - blocks.size())); + } } return blocks; @@ -788,7 +795,7 @@ private RowRanges getRowRanges(int blockIndex) { paths.keySet(), blocks.get(blockIndex).getRowCount(), blocks.get(blockIndex).getRowIndexOffset(), - fileIndexResult, + selection, deletion); blockRowRanges.set(blockIndex, rowRanges); } diff --git a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java index 02e9ef1f3413..3b11049a8739 100644 --- a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java +++ b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java @@ -18,7 +18,6 @@ package org.apache.parquet.internal.filter2.columnindex; -import org.apache.paimon.fileindex.FileIndexResult; import org.apache.paimon.utils.RoaringBitmap32; import org.apache.parquet.filter2.compat.FilterCompat; @@ -59,7 +58,7 @@ * therefore a {@link MissingOffsetIndexException} will be thrown from any {@code visit} methods if * any of the required offset indexes is missing. * - *

Note: The class was copied over to support using {@link FileIndexResult} to filter {@link + *

Note: The class was copied over to support using {@link RoaringBitmap32} to filter {@link * RowRanges}. */ public class ColumnIndexFilter implements Visitor { @@ -69,10 +68,25 @@ public class ColumnIndexFilter implements Visitor { private final Set columns; private final long rowCount; private final long rowIndexOffset; - @Nullable private final FileIndexResult fileIndexResult; + @Nullable private final RoaringBitmap32 selection; @Nullable private final RoaringBitmap32 deletion; private RowRanges allRows; + private ColumnIndexFilter( + ColumnIndexStore columnIndexStore, + Set paths, + long rowCount, + long rowIndexOffset, + @Nullable RoaringBitmap32 selection, + @Nullable RoaringBitmap32 deletion) { + this.columnIndexStore = columnIndexStore; + this.columns = paths; + this.rowCount = rowCount; + this.rowIndexOffset = rowIndexOffset; + this.selection = selection; + this.deletion = deletion; + } + /** * Calculates the row ranges containing the indexes of the rows might match the specified * filter. @@ -84,8 +98,8 @@ public class ColumnIndexFilter implements Visitor { * column has values written in the file * @param rowCount the total number of rows in the row-group * @param rowIndexOffset the offset of the row-group - * @param fileIndexResult the file index result; it will use to filter or narrow the row ranges - * @param deletion the deletion vector result; it will use to filter or narrow the row ranges + * @param selection the selected position; it will use to filter or narrow the row ranges + * @param deletion the deleted position; it will use to filter the row ranges * @return the ranges of the possible matching row indexes; the returned ranges will contain all * the rows if any of the required offset index is missing */ @@ -95,7 +109,7 @@ public static RowRanges calculateRowRanges( Set paths, long rowCount, long rowIndexOffset, - @Nullable FileIndexResult fileIndexResult, + @Nullable RoaringBitmap32 selection, @Nullable RoaringBitmap32 deletion) { return filter.accept( new FilterCompat.Visitor() { @@ -110,7 +124,7 @@ public RowRanges visit(FilterPredicateCompat filterPredicateCompat) { paths, rowCount, rowIndexOffset, - fileIndexResult, + selection, deletion)); } catch (MissingOffsetIndexException e) { LOGGER.info(e.getMessage()); @@ -130,21 +144,6 @@ public RowRanges visit(NoOpFilter noOpFilter) { }); } - private ColumnIndexFilter( - ColumnIndexStore columnIndexStore, - Set paths, - long rowCount, - long rowIndexOffset, - @Nullable FileIndexResult fileIndexResult, - @Nullable RoaringBitmap32 deletion) { - this.columnIndexStore = columnIndexStore; - this.columns = paths; - this.rowCount = rowCount; - this.rowIndexOffset = rowIndexOffset; - this.fileIndexResult = fileIndexResult; - this.deletion = deletion; - } - private RowRanges allRows() { if (allRows == null) { allRows = RowRanges.createSingle(rowCount); @@ -240,8 +239,7 @@ private RowRanges applyPredicate( return allRows(); } - return RowRanges.create( - rowCount, rowIndexOffset, func.apply(ci), oi, fileIndexResult, deletion); + return RowRanges.create(rowCount, rowIndexOffset, func.apply(ci), oi, selection, deletion); } @Override diff --git a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java index 715dac5f364e..307c826df230 100644 --- a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java +++ b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java @@ -19,7 +19,6 @@ package org.apache.parquet.internal.filter2.columnindex; import org.apache.paimon.fileindex.FileIndexResult; -import org.apache.paimon.fileindex.bitmap.BitmapIndexResult; import org.apache.paimon.utils.RoaringBitmap32; import org.apache.parquet.filter2.compat.FilterCompat.Filter; @@ -40,13 +39,12 @@ * column index based filtering. To be used iterate over the matching row indexes to be read from a * row-group, retrieve the count of the matching rows or check overlapping of a row index range. * - *

Note: The class was copied over to support using {@link FileIndexResult} and deletion vector - * result to filter or narrow the {@link RowRanges}. Added a new method {@link - * RowRanges#create(long, long, PrimitiveIterator.OfInt, OffsetIndex, FileIndexResult, - * RoaringBitmap32)} + *

Note: The class was copied over to support using selected position and deleted position result + * to filter or narrow the {@link RowRanges}. Added a new method {@link RowRanges#create(long, long, + * PrimitiveIterator.OfInt, OffsetIndex, RoaringBitmap32, RoaringBitmap32)} * * @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Set, long, long, - * FileIndexResult, RoaringBitmap32) + * RoaringBitmap32, RoaringBitmap32) */ public class RowRanges { @@ -173,43 +171,38 @@ public static RowRanges create( long rowIndexOffset, PrimitiveIterator.OfInt pageIndexes, OffsetIndex offsetIndex, - @Nullable FileIndexResult fileIndexResult, + @Nullable RoaringBitmap32 selection, @Nullable RoaringBitmap32 deletion) { + int cnt = 0; RowRanges ranges = new RowRanges(); while (pageIndexes.hasNext()) { int pageIndex = pageIndexes.nextInt(); long firstRowIndex = offsetIndex.getFirstRowIndex(pageIndex); long lastRowIndex = offsetIndex.getLastRowIndex(pageIndex, rowCount); - // using file index result to filter or narrow the row ranges - if (fileIndexResult instanceof BitmapIndexResult) { - RoaringBitmap32 bitmap = ((BitmapIndexResult) fileIndexResult).get(); + if (selection != null) { RoaringBitmap32 range = RoaringBitmap32.bitmapOfRange( rowIndexOffset + firstRowIndex, rowIndexOffset + lastRowIndex + 1); - RoaringBitmap32 result = RoaringBitmap32.and(bitmap, range); - if (result.isEmpty()) { + if (!RoaringBitmap32.intersects(selection, range)) { + cnt += 1; continue; } - firstRowIndex = result.first() - rowIndexOffset; - lastRowIndex = result.last() - rowIndexOffset; - } - - // using deletion vector result to filter or narrow the row ranges - if (deletion != null) { + firstRowIndex = selection.nextValue((int) (rowIndexOffset + firstRowIndex)); + lastRowIndex = selection.previousValue((int) (rowIndexOffset + firstRowIndex + 1)); + } else if (deletion != null) { RoaringBitmap32 range = RoaringBitmap32.bitmapOfRange( rowIndexOffset + firstRowIndex, rowIndexOffset + lastRowIndex + 1); - RoaringBitmap32 result = RoaringBitmap32.andNot(range, deletion); - if (result.isEmpty()) { + if (deletion.contains(range)) { + cnt += 1; continue; } - firstRowIndex = result.first() - rowIndexOffset; - lastRowIndex = result.last() - rowIndexOffset; } ranges.add(new Range(firstRowIndex, lastRowIndex)); } + if (cnt > 0) System.out.println("filter row ranges: " + cnt); return ranges; } @@ -390,3 +383,15 @@ public String toString() { return ranges.toString(); } } + +//read | Best/Avg Time(ms) | Row Rate(K/s) | Per Row(ns)| Relative | filter row groups | filter row ranges +//--|--|--|--|--|--|--| +//normal-10000000-800000-788897 | 16168 / 16287 | 185.6 | 5389.3 | 1.0X | 0 | 0 +//dv-push-down-10000000-800000-788897 | 16020 / 16591 | 187.3 | 5340.0 | 1.0X | 0 | 0 +//index-push-down-10000000-800000-788897 | 819 / 857 | 3662.2 | 273.1 | 19.7X | 0 | 263 +//dv-and-index-push-down-10000000-800000-788897 | 788 / 1123 | 3806.2 | 262.7 | 20.5X | 0 | 263 +// +// 185.6 5389.3 1.0X +// 187.3 5340.0 1.0X +// 3662.2 273.1 19.7X +// 3806.2 262.7 20.5X From 69101091c4c7cf6d66a286fee55bf9025e4209bc Mon Sep 17 00:00:00 2001 From: tanjialiang Date: Fri, 3 Jan 2025 18:37:48 +0800 Subject: [PATCH 06/17] fix --- .../apache/paimon/utils/RoaringBitmap32.java | 16 +++++++++ .../table/PrimaryKeyFileStoreTableTest.java | 5 +++ .../parquet/hadoop/ParquetFileReader.java | 4 --- .../filter2/columnindex/RowRanges.java | 36 ++++--------------- 4 files changed, 28 insertions(+), 33 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java index d3b07a2362a9..9e0035bb49dc 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java @@ -60,6 +60,10 @@ public boolean contains(int x) { return roaringBitmap.contains(x); } + public boolean contains(RoaringBitmap32 contains) { + return roaringBitmap.contains(contains.roaringBitmap); + } + public boolean isEmpty() { return roaringBitmap.isEmpty(); } @@ -80,6 +84,14 @@ public int last() { return roaringBitmap.last(); } + public long nextValue(int fromValue) { + return roaringBitmap.nextValue(fromValue); + } + + public long previousValue(int fromValue) { + return roaringBitmap.previousValue(fromValue); + } + public RoaringBitmap32 clone() { return new RoaringBitmap32(roaringBitmap.clone()); } @@ -173,4 +185,8 @@ public RoaringBitmap next() { public static RoaringBitmap32 andNot(final RoaringBitmap32 x1, final RoaringBitmap32 x2) { return new RoaringBitmap32(RoaringBitmap.andNot(x1.roaringBitmap, x2.roaringBitmap)); } + + public static boolean intersects(final RoaringBitmap32 x1, final RoaringBitmap32 x2) { + return RoaringBitmap.intersects(x1.roaringBitmap, x2.roaringBitmap); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index dc9954367ab1..eadb37071280 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -864,6 +864,9 @@ public void testDeletionVectorsWithParquetFilter() throws Exception { List messages = write.prepareCommit(); BatchTableCommit commit = writeBuilder.newCommit(); commit.commit(messages); + write.close(); + commit.close(); + write = (BatchTableWrite) writeBuilder @@ -883,6 +886,8 @@ public void testDeletionVectorsWithParquetFilter() throws Exception { messages = write.prepareCommit(); commit = writeBuilder.newCommit(); commit.commit(messages); + write.close(); + commit.close(); PredicateBuilder builder = new PredicateBuilder(ROW_TYPE); List splits = toSplits(table.newSnapshotReader().read().dataSplits()); diff --git a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index e8f7e500c987..f57624d374c6 100644 --- a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -375,7 +375,6 @@ private List filterRowGroups(List blocks) throws I blocks = RowGroupFilter.filterRowGroups(levels, recordFilter, blocks, this); } - int size = blocks.size(); if (selection != null) { blocks = blocks.stream() @@ -403,9 +402,6 @@ private List filterRowGroups(List blocks) throws I }) .collect(Collectors.toList()); } - if (size > blocks.size()) { - System.out.println("filter row group: " + (size - blocks.size())); - } } return blocks; diff --git a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java index 307c826df230..619b15144d22 100644 --- a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java +++ b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java @@ -18,7 +18,6 @@ package org.apache.parquet.internal.filter2.columnindex; -import org.apache.paimon.fileindex.FileIndexResult; import org.apache.paimon.utils.RoaringBitmap32; import org.apache.parquet.filter2.compat.FilterCompat.Filter; @@ -162,10 +161,7 @@ public static RowRanges create( return ranges; } - /** - * Support using the {@link FileIndexResult} and the deletion vector result to filter the row - * ranges. - */ + /** Support using the selected position or the deleted position to filter the row ranges. */ public static RowRanges create( long rowCount, long rowIndexOffset, @@ -173,36 +169,30 @@ public static RowRanges create( OffsetIndex offsetIndex, @Nullable RoaringBitmap32 selection, @Nullable RoaringBitmap32 deletion) { - int cnt = 0; RowRanges ranges = new RowRanges(); while (pageIndexes.hasNext()) { int pageIndex = pageIndexes.nextInt(); long firstRowIndex = offsetIndex.getFirstRowIndex(pageIndex); long lastRowIndex = offsetIndex.getLastRowIndex(pageIndex, rowCount); + long first = rowIndexOffset + firstRowIndex; + long last = rowIndexOffset + lastRowIndex; if (selection != null) { - RoaringBitmap32 range = - RoaringBitmap32.bitmapOfRange( - rowIndexOffset + firstRowIndex, rowIndexOffset + lastRowIndex + 1); + RoaringBitmap32 range = RoaringBitmap32.bitmapOfRange(first, last + 1); if (!RoaringBitmap32.intersects(selection, range)) { - cnt += 1; continue; } - firstRowIndex = selection.nextValue((int) (rowIndexOffset + firstRowIndex)); - lastRowIndex = selection.previousValue((int) (rowIndexOffset + firstRowIndex + 1)); + firstRowIndex = selection.nextValue((int) first) - rowIndexOffset; + lastRowIndex = selection.previousValue((int) (last)) - rowIndexOffset; } else if (deletion != null) { - RoaringBitmap32 range = - RoaringBitmap32.bitmapOfRange( - rowIndexOffset + firstRowIndex, rowIndexOffset + lastRowIndex + 1); + RoaringBitmap32 range = RoaringBitmap32.bitmapOfRange(first, last + 1); if (deletion.contains(range)) { - cnt += 1; continue; } } ranges.add(new Range(firstRowIndex, lastRowIndex)); } - if (cnt > 0) System.out.println("filter row ranges: " + cnt); return ranges; } @@ -383,15 +373,3 @@ public String toString() { return ranges.toString(); } } - -//read | Best/Avg Time(ms) | Row Rate(K/s) | Per Row(ns)| Relative | filter row groups | filter row ranges -//--|--|--|--|--|--|--| -//normal-10000000-800000-788897 | 16168 / 16287 | 185.6 | 5389.3 | 1.0X | 0 | 0 -//dv-push-down-10000000-800000-788897 | 16020 / 16591 | 187.3 | 5340.0 | 1.0X | 0 | 0 -//index-push-down-10000000-800000-788897 | 819 / 857 | 3662.2 | 273.1 | 19.7X | 0 | 263 -//dv-and-index-push-down-10000000-800000-788897 | 788 / 1123 | 3806.2 | 262.7 | 20.5X | 0 | 263 -// -// 185.6 5389.3 1.0X -// 187.3 5340.0 1.0X -// 3662.2 273.1 19.7X -// 3806.2 262.7 20.5X From d0e0bef9c2653126b4c86085b06f696fd4a41a3d Mon Sep 17 00:00:00 2001 From: tanjialiang Date: Fri, 3 Jan 2025 18:43:01 +0800 Subject: [PATCH 07/17] fix --- .../columnindex/ColumnIndexFilter.java | 30 +++++++++---------- .../filter2/columnindex/RowRanges.java | 1 + 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java index 3b11049a8739..f64b40d92e77 100644 --- a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java +++ b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java @@ -72,21 +72,6 @@ public class ColumnIndexFilter implements Visitor { @Nullable private final RoaringBitmap32 deletion; private RowRanges allRows; - private ColumnIndexFilter( - ColumnIndexStore columnIndexStore, - Set paths, - long rowCount, - long rowIndexOffset, - @Nullable RoaringBitmap32 selection, - @Nullable RoaringBitmap32 deletion) { - this.columnIndexStore = columnIndexStore; - this.columns = paths; - this.rowCount = rowCount; - this.rowIndexOffset = rowIndexOffset; - this.selection = selection; - this.deletion = deletion; - } - /** * Calculates the row ranges containing the indexes of the rows might match the specified * filter. @@ -144,6 +129,21 @@ public RowRanges visit(NoOpFilter noOpFilter) { }); } + private ColumnIndexFilter( + ColumnIndexStore columnIndexStore, + Set paths, + long rowCount, + long rowIndexOffset, + @Nullable RoaringBitmap32 selection, + @Nullable RoaringBitmap32 deletion) { + this.columnIndexStore = columnIndexStore; + this.columns = paths; + this.rowCount = rowCount; + this.rowIndexOffset = rowIndexOffset; + this.selection = selection; + this.deletion = deletion; + } + private RowRanges allRows() { if (allRows == null) { allRows = RowRanges.createSingle(rowCount); diff --git a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java index 619b15144d22..aee22ef0cad3 100644 --- a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java +++ b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java @@ -175,6 +175,7 @@ public static RowRanges create( long firstRowIndex = offsetIndex.getFirstRowIndex(pageIndex); long lastRowIndex = offsetIndex.getLastRowIndex(pageIndex, rowCount); + // using selected position or deletion position to filter or narrow the row ranges long first = rowIndexOffset + firstRowIndex; long last = rowIndexOffset + lastRowIndex; if (selection != null) { From 9463551f027926d91f26d184d10d94b8a8748e99 Mon Sep 17 00:00:00 2001 From: tanjialiang Date: Mon, 6 Jan 2025 11:32:05 +0800 Subject: [PATCH 08/17] merge selection and deletion --- .../paimon/format/FormatReaderContext.java | 11 +++-- .../paimon/format/FormatReaderFactory.java | 3 +- .../apache/paimon/utils/RoaringBitmap32.java | 20 ++++------ .../paimon/operation/RawFileSplitRead.java | 13 ++++-- ...CompactedChangelogFormatReaderFactory.java | 5 +-- .../org/apache/orc/impl/RecordReaderImpl.java | 21 ++++------ .../paimon/format/orc/OrcReaderFactory.java | 17 ++++---- .../format/parquet/ParquetReaderFactory.java | 2 +- .../parquet/hadoop/ParquetFileReader.java | 40 +++++-------------- .../filter2/columnindex/RowRanges.java | 8 ++-- 10 files changed, 55 insertions(+), 85 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java index 3858552a6712..28dc648930f8 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java @@ -18,7 +18,6 @@ package org.apache.paimon.format; -import org.apache.paimon.fileindex.FileIndexResult; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.reader.RecordReader; @@ -32,7 +31,7 @@ public class FormatReaderContext implements FormatReaderFactory.Context { private final FileIO fileIO; private final Path file; private final long fileSize; - @Nullable private final FileIndexResult fileIndexResult; + @Nullable private final RoaringBitmap32 selection; @Nullable private final RoaringBitmap32 deletion; public FormatReaderContext(FileIO fileIO, Path file, long fileSize) { @@ -43,12 +42,12 @@ public FormatReaderContext( FileIO fileIO, Path file, long fileSize, - @Nullable FileIndexResult fileIndexResult, + @Nullable RoaringBitmap32 selection, @Nullable RoaringBitmap32 deletion) { this.fileIO = fileIO; this.file = file; this.fileSize = fileSize; - this.fileIndexResult = fileIndexResult; + this.selection = selection; this.deletion = deletion; } @@ -69,8 +68,8 @@ public long fileSize() { @Nullable @Override - public FileIndexResult fileIndex() { - return fileIndexResult; + public RoaringBitmap32 selection() { + return selection; } @Nullable diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java index 2c6470af9e8c..a53f1623ee50 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java @@ -19,7 +19,6 @@ package org.apache.paimon.format; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.fileindex.FileIndexResult; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.reader.FileRecordReader; @@ -45,7 +44,7 @@ interface Context { long fileSize(); @Nullable - FileIndexResult fileIndex(); + RoaringBitmap32 selection(); @Nullable RoaringBitmap32 deletion(); diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java index 9e0035bb49dc..b4a19241c863 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java @@ -72,10 +72,6 @@ public long getCardinality() { return roaringBitmap.getLongCardinality(); } - public long rangeCardinality(long start, long end) { - return roaringBitmap.rangeCardinality(start, end); - } - public int first() { return roaringBitmap.first(); } @@ -92,6 +88,14 @@ public long previousValue(int fromValue) { return roaringBitmap.previousValue(fromValue); } + public boolean intersects(long minimum, long supremum) { + return roaringBitmap.intersects(minimum, supremum); + } + + public boolean contains(long minimum, long supremum) { + return roaringBitmap.contains(minimum, supremum); + } + public RoaringBitmap32 clone() { return new RoaringBitmap32(roaringBitmap.clone()); } @@ -154,10 +158,6 @@ public static RoaringBitmap32 bitmapOf(int... dat) { return roaringBitmap32; } - public static RoaringBitmap32 bitmapOfRange(long min, long max) { - return new RoaringBitmap32(RoaringBitmap.bitmapOfRange(min, max)); - } - public static RoaringBitmap32 and(final RoaringBitmap32 x1, final RoaringBitmap32 x2) { return new RoaringBitmap32(RoaringBitmap.and(x1.roaringBitmap, x2.roaringBitmap)); } @@ -185,8 +185,4 @@ public RoaringBitmap next() { public static RoaringBitmap32 andNot(final RoaringBitmap32 x1, final RoaringBitmap32 x2) { return new RoaringBitmap32(RoaringBitmap.andNot(x1.roaringBitmap, x2.roaringBitmap)); } - - public static boolean intersects(final RoaringBitmap32 x1, final RoaringBitmap32 x2) { - return RoaringBitmap.intersects(x1.roaringBitmap, x2.roaringBitmap); - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index 1af0a88a0c80..e024f73db928 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -210,15 +210,20 @@ private FileRecordReader createFileReader( } } + RoaringBitmap32 selection = null; + if (fileIndexResult instanceof BitmapIndexResult) { + selection = ((BitmapIndexResult) fileIndexResult).get(); + } + RoaringBitmap32 deletion = null; DeletionVector deletionVector = dvFactory == null ? null : dvFactory.get(); if (deletionVector instanceof BitmapDeletionVector) { deletion = ((BitmapDeletionVector) deletionVector).get(); } - if (deletion != null && fileIndexResult instanceof BitmapIndexResult) { - RoaringBitmap32 selection = ((BitmapIndexResult) fileIndexResult).get(); - if (RoaringBitmap32.andNot(selection, deletion).isEmpty()) { + if (deletion != null && selection != null) { + selection = RoaringBitmap32.andNot(selection, deletion); + if (selection.isEmpty()) { return new EmptyFileRecordReader<>(); } } @@ -228,7 +233,7 @@ private FileRecordReader createFileReader( fileIO, dataFilePathFactory.toPath(file), file.fileSize(), - fileIndexResult, + selection, deletion); FileRecordReader fileRecordReader = new DataFileRecordReader( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java index 9a5753311aef..2fa6b3cdb1bf 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java @@ -20,7 +20,6 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.fileindex.FileIndexResult; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; @@ -84,8 +83,8 @@ public long fileSize() { } @Override - public FileIndexResult fileIndex() { - return context.fileIndex(); + public RoaringBitmap32 selection() { + return context.selection(); } @Override diff --git a/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java b/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java index 93aa0719caea..4c821185d7d8 100644 --- a/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java +++ b/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java @@ -18,8 +18,6 @@ package org.apache.orc.impl; -import org.apache.paimon.fileindex.FileIndexResult; -import org.apache.paimon.fileindex.bitmap.BitmapIndexResult; import org.apache.paimon.utils.RoaringBitmap32; import org.apache.commons.lang3.ArrayUtils; @@ -129,7 +127,8 @@ public class RecordReaderImpl implements RecordReader { private final boolean noSelectedVector; // identifies whether the file has bad bloom filters that we should not use. private final boolean skipBloomFilters; - @Nullable private final FileIndexResult fileIndexResult; + @Nullable private final RoaringBitmap32 selection; + static final String[] BAD_CPP_BLOOM_FILTER_VERSIONS = { "1.6.0", "1.6.1", "1.6.2", "1.6.3", "1.6.4", "1.6.5", "1.6.6", "1.6.7", "1.6.8", "1.6.9", "1.6.10", "1.6.11", "1.7.0" @@ -226,9 +225,9 @@ public static int[] mapSargColumnsToOrcInternalColIdx( } public RecordReaderImpl( - ReaderImpl fileReader, Reader.Options options, FileIndexResult fileIndexResult) + ReaderImpl fileReader, Reader.Options options, @Nullable RoaringBitmap32 selection) throws IOException { - this.fileIndexResult = fileIndexResult; + this.selection = selection; OrcFile.WriterVersion writerVersion = fileReader.getWriterVersion(); SchemaEvolution evolution; if (options.getSchema() == null) { @@ -1278,7 +1277,7 @@ public boolean[] pickRowGroups( OrcProto.BloomFilterIndex[] bloomFilterIndices, boolean returnNone, long rowBaseInStripe, - FileIndexResult fileIndexResult) + @Nullable RoaringBitmap32 selection) throws IOException { long rowsInStripe = stripe.getNumberOfRows(); int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride); @@ -1289,10 +1288,6 @@ public boolean[] pickRowGroups( boolean hasSkipped = false; SearchArgument.TruthValue[] exceptionAnswer = new SearchArgument.TruthValue[leafValues.length]; - RoaringBitmap32 bitmap = null; - if (fileIndexResult instanceof BitmapIndexResult) { - bitmap = ((BitmapIndexResult) fileIndexResult).get(); - } for (int rowGroup = 0; rowGroup < result.length; ++rowGroup) { for (int pred = 0; pred < leafValues.length; ++pred) { int columnIx = filterColumns[pred]; @@ -1376,10 +1371,10 @@ public boolean[] pickRowGroups( } } result[rowGroup] = sarg.evaluate(leafValues).isNeeded(); - if (bitmap != null) { + if (selection != null) { long firstRow = rowBaseInStripe + rowIndexStride * rowGroup; long lastRow = Math.min(firstRow + rowIndexStride, firstRow + rowsInStripe); - result[rowGroup] &= bitmap.rangeCardinality(firstRow, lastRow) > 0; + result[rowGroup] &= selection.intersects(firstRow, lastRow); } hasSelected = hasSelected || result[rowGroup]; hasSkipped = hasSkipped || (!result[rowGroup]); @@ -1435,7 +1430,7 @@ protected boolean[] pickRowGroups() throws IOException { skipBloomFilters ? null : indexes.getBloomFilterIndex(), false, rowBaseInStripe, - fileIndexResult); + selection); } private void clearStreams() { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 6683b357fd57..fad4c7e36a66 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -24,8 +24,6 @@ import org.apache.paimon.data.columnar.ColumnarRowIterator; import org.apache.paimon.data.columnar.VectorizedColumnBatch; import org.apache.paimon.data.columnar.VectorizedRowIterator; -import org.apache.paimon.fileindex.FileIndexResult; -import org.apache.paimon.fileindex.bitmap.BitmapIndexResult; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.format.OrcFormatReaderContext; import org.apache.paimon.format.fs.HadoopReadOnlyFileSystem; @@ -39,6 +37,7 @@ import org.apache.paimon.utils.IOUtils; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Pool; +import org.apache.paimon.utils.RoaringBitmap32; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; @@ -110,7 +109,7 @@ public OrcVectorizedReader createReader(FormatReaderFactory.Context context) context.filePath(), 0, context.fileSize(), - context.fileIndex(), + context.selection(), deletionVectorsEnabled); return new OrcVectorizedReader(orcReader, poolOfBatches); } @@ -260,10 +259,10 @@ private static RecordReader createRecordReader( org.apache.paimon.fs.Path path, long splitStart, long splitLength, - @Nullable FileIndexResult fileIndexResult, + @Nullable RoaringBitmap32 selection, boolean deletionVectorsEnabled) throws IOException { - org.apache.orc.Reader orcReader = createReader(conf, fileIO, path, fileIndexResult); + org.apache.orc.Reader orcReader = createReader(conf, fileIO, path, selection); try { // get offset and length for the stripes that start in the split Pair offsetAndLength = @@ -278,9 +277,7 @@ private static RecordReader createRecordReader( .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf)) .tolerateMissingSchema( OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf)); - if (!conjunctPredicates.isEmpty() - && !deletionVectorsEnabled - && !(fileIndexResult instanceof BitmapIndexResult)) { + if (!conjunctPredicates.isEmpty() && !deletionVectorsEnabled && selection == null) { // row group filter push down will make row number change incorrect // so deletion vectors mode and bitmap index cannot work with row group push down options.useSelected(OrcConf.READER_USE_SELECTED.getBoolean(conf)); @@ -346,7 +343,7 @@ public static org.apache.orc.Reader createReader( org.apache.hadoop.conf.Configuration conf, FileIO fileIO, org.apache.paimon.fs.Path path, - @Nullable FileIndexResult fileIndexResult) + @Nullable RoaringBitmap32 selection) throws IOException { // open ORC file and create reader org.apache.hadoop.fs.Path hPath = new org.apache.hadoop.fs.Path(path.toUri()); @@ -359,7 +356,7 @@ public static org.apache.orc.Reader createReader( return new ReaderImpl(hPath, readerOptions) { @Override public RecordReader rows(Options options) throws IOException { - return new RecordReaderImpl(this, options, fileIndexResult); + return new RecordReaderImpl(this, options, selection); } }; } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index 9b173fe13a44..64d31aa77eea 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -120,7 +120,7 @@ public FileRecordReader createReader(FormatReaderFactory.Context co new ParquetFileReader( ParquetInputFile.fromPath(context.fileIO(), context.filePath()), builder.build(), - context.fileIndex(), + context.selection(), context.deletion()); MessageType fileSchema = reader.getFileMetaData().getSchema(); MessageType requestedSchema = clipParquetSchema(fileSchema); diff --git a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index f57624d374c6..e2d00464d91c 100644 --- a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -18,8 +18,6 @@ package org.apache.parquet.hadoop; -import org.apache.paimon.fileindex.FileIndexResult; -import org.apache.paimon.fileindex.bitmap.BitmapIndexResult; import org.apache.paimon.format.parquet.ParquetInputFile; import org.apache.paimon.format.parquet.ParquetInputStream; import org.apache.paimon.fs.FileRange; @@ -237,13 +235,15 @@ private static ParquetMetadata readFooter( public ParquetFileReader( InputFile file, ParquetReadOptions options, - @Nullable FileIndexResult fileIndexResult, + @Nullable RoaringBitmap32 selection, @Nullable RoaringBitmap32 deletion) throws IOException { this.converter = new ParquetMetadataConverter(options); this.file = (ParquetInputFile) file; this.f = this.file.newStream(); this.options = options; + this.selection = selection; + this.deletion = deletion; try { this.footer = readFooter(file, options, f, converter); } catch (Exception e) { @@ -259,16 +259,6 @@ public ParquetFileReader( this.fileDecryptor = null; // Plaintext file. No need in decryptor } - RoaringBitmap32 selection = null; - if (fileIndexResult instanceof BitmapIndexResult) { - selection = ((BitmapIndexResult) fileIndexResult).get(); - } - if (selection != null && deletion != null) { - selection = RoaringBitmap32.andNot(selection, deletion); - } - this.selection = selection; - this.deletion = deletion; - try { this.blocks = filterRowGroups(footer.getBlocks()); this.blocksFiltered = this.blocks.size() != footer.getBlocks().size(); @@ -379,27 +369,19 @@ private List filterRowGroups(List blocks) throws I blocks = blocks.stream() .filter( - it -> { - long rowIndexOffset = it.getRowIndexOffset(); - RoaringBitmap32 range = - RoaringBitmap32.bitmapOfRange( - rowIndexOffset, - rowIndexOffset + it.getRowCount()); - return RoaringBitmap32.intersects(selection, range); - }) + it -> + selection.intersects( + it.getRowIndexOffset(), + it.getRowIndexOffset() + it.getRowCount())) .collect(Collectors.toList()); } else if (deletion != null) { blocks = blocks.stream() .filter( - it -> { - long rowIndexOffset = it.getRowIndexOffset(); - RoaringBitmap32 range = - RoaringBitmap32.bitmapOfRange( - rowIndexOffset, - rowIndexOffset + it.getRowCount()); - return !deletion.contains(range); - }) + it -> + !deletion.contains( + it.getRowIndexOffset(), + it.getRowIndexOffset() + it.getRowCount())) .collect(Collectors.toList()); } } diff --git a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java index aee22ef0cad3..6da21e252849 100644 --- a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java +++ b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java @@ -175,19 +175,17 @@ public static RowRanges create( long firstRowIndex = offsetIndex.getFirstRowIndex(pageIndex); long lastRowIndex = offsetIndex.getLastRowIndex(pageIndex, rowCount); - // using selected position or deletion position to filter or narrow the row ranges + // using selected position or deleted position to filter or narrow the row ranges long first = rowIndexOffset + firstRowIndex; long last = rowIndexOffset + lastRowIndex; if (selection != null) { - RoaringBitmap32 range = RoaringBitmap32.bitmapOfRange(first, last + 1); - if (!RoaringBitmap32.intersects(selection, range)) { + if (!selection.intersects(first, last + 1)) { continue; } firstRowIndex = selection.nextValue((int) first) - rowIndexOffset; lastRowIndex = selection.previousValue((int) (last)) - rowIndexOffset; } else if (deletion != null) { - RoaringBitmap32 range = RoaringBitmap32.bitmapOfRange(first, last + 1); - if (deletion.contains(range)) { + if (deletion.contains(first, last + 1)) { continue; } } From 12bb4aa2ff26b28e172ed23b6611e5b493c1f644 Mon Sep 17 00:00:00 2001 From: tanjialiang Date: Mon, 6 Jan 2025 11:41:08 +0800 Subject: [PATCH 09/17] merge file index and deletion bitmap --- .../main/java/org/apache/paimon/utils/RoaringBitmap32.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java index b4a19241c863..a7f3c3ba1240 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java @@ -60,10 +60,6 @@ public boolean contains(int x) { return roaringBitmap.contains(x); } - public boolean contains(RoaringBitmap32 contains) { - return roaringBitmap.contains(contains.roaringBitmap); - } - public boolean isEmpty() { return roaringBitmap.isEmpty(); } From e481835baa36c428cc1d2a5fbb82f1fc7c274cac Mon Sep 17 00:00:00 2001 From: tanjialiang Date: Mon, 6 Jan 2025 13:11:32 +0800 Subject: [PATCH 10/17] remove clone --- .../paimon/deletionvectors/BitmapDeletionVector.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java index 40a509c54fbb..2ef31986e8a9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java @@ -93,8 +93,13 @@ public byte[] serializeToBytes() { } } + /** + *

Note: the result is read only, do not call any modify operation outside. + * + * @return the deleted position + */ public RoaringBitmap32 get() { - return roaringBitmap.clone(); + return roaringBitmap; } public static DeletionVector deserializeFromByteBuffer(ByteBuffer buffer) throws IOException { From 8ef690a9b61554867c4a8c02f503008a27af4477 Mon Sep 17 00:00:00 2001 From: tanjialiang Date: Mon, 6 Jan 2025 13:16:04 +0800 Subject: [PATCH 11/17] mvn spotless:apply --- .../org/apache/paimon/deletionvectors/BitmapDeletionVector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java index 2ef31986e8a9..37b5fdde123f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java @@ -94,7 +94,7 @@ public byte[] serializeToBytes() { } /** - *

Note: the result is read only, do not call any modify operation outside. + * Note: the result is read only, do not call any modify operation outside. * * @return the deleted position */ From a87bbec5e14331eb2da3709ddd2d32784cf88d93 Mon Sep 17 00:00:00 2001 From: tanjialiang Date: Mon, 6 Jan 2025 14:31:39 +0800 Subject: [PATCH 12/17] remove deletion push down --- .../paimon/format/FormatReaderContext.java | 16 +------ .../paimon/format/FormatReaderFactory.java | 3 -- .../apache/paimon/utils/RoaringBitmap32.java | 4 -- .../paimon/operation/RawFileSplitRead.java | 6 +-- .../table/PrimaryKeyFileStoreTableTest.java | 43 +++++++++---------- ...CompactedChangelogFormatReaderFactory.java | 5 --- .../format/parquet/ParquetReaderFactory.java | 3 +- .../paimon/format/parquet/ParquetUtil.java | 1 - .../parquet/hadoop/ParquetFileReader.java | 19 +------- .../columnindex/ColumnIndexFilter.java | 14 ++---- .../filter2/columnindex/RowRanges.java | 23 ++++------ 11 files changed, 39 insertions(+), 98 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java index 28dc648930f8..053acd99a49c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java @@ -32,23 +32,17 @@ public class FormatReaderContext implements FormatReaderFactory.Context { private final Path file; private final long fileSize; @Nullable private final RoaringBitmap32 selection; - @Nullable private final RoaringBitmap32 deletion; public FormatReaderContext(FileIO fileIO, Path file, long fileSize) { - this(fileIO, file, fileSize, null, null); + this(fileIO, file, fileSize, null); } public FormatReaderContext( - FileIO fileIO, - Path file, - long fileSize, - @Nullable RoaringBitmap32 selection, - @Nullable RoaringBitmap32 deletion) { + FileIO fileIO, Path file, long fileSize, @Nullable RoaringBitmap32 selection) { this.fileIO = fileIO; this.file = file; this.fileSize = fileSize; this.selection = selection; - this.deletion = deletion; } @Override @@ -71,10 +65,4 @@ public long fileSize() { public RoaringBitmap32 selection() { return selection; } - - @Nullable - @Override - public RoaringBitmap32 deletion() { - return deletion; - } } diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java index a53f1623ee50..8e64cc63a4c0 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java @@ -45,8 +45,5 @@ interface Context { @Nullable RoaringBitmap32 selection(); - - @Nullable - RoaringBitmap32 deletion(); } } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java index a7f3c3ba1240..b1f58b47d00c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java @@ -88,10 +88,6 @@ public boolean intersects(long minimum, long supremum) { return roaringBitmap.intersects(minimum, supremum); } - public boolean contains(long minimum, long supremum) { - return roaringBitmap.contains(minimum, supremum); - } - public RoaringBitmap32 clone() { return new RoaringBitmap32(roaringBitmap.clone()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index e024f73db928..aea35d71980b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -230,11 +230,7 @@ private FileRecordReader createFileReader( FormatReaderContext formatReaderContext = new FormatReaderContext( - fileIO, - dataFilePathFactory.toPath(file), - file.fileSize(), - selection, - deletion); + fileIO, dataFilePathFactory.toPath(file), file.fileSize(), selection); FileRecordReader fileRecordReader = new DataFileRecordReader( formatReaderMapping.getReaderFactory(), diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index eadb37071280..9427ed61c9b9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -864,38 +864,31 @@ public void testDeletionVectorsWithParquetFilter() throws Exception { List messages = write.prepareCommit(); BatchTableCommit commit = writeBuilder.newCommit(); commit.commit(messages); - write.close(); - commit.close(); - write = (BatchTableWrite) writeBuilder .newWrite() .withIOManager(new IOManagerImpl(tempDir.toString())); - - // test row ranges filter - for (int i = 1000; i < 6000; i++) { + for (int i = 110000; i < 115000; i++) { write.write(rowDataWithKind(RowKind.DELETE, 1, i, i * 100L)); } - // test deletion vector filter row group - for (int i = 93421; i < 187795; i++) { + for (int i = 130000; i < 135000; i++) { write.write(rowDataWithKind(RowKind.DELETE, 1, i, i * 100L)); } messages = write.prepareCommit(); commit = writeBuilder.newCommit(); commit.commit(messages); - write.close(); - commit.close(); PredicateBuilder builder = new PredicateBuilder(ROW_TYPE); List splits = toSplits(table.newSnapshotReader().read().dataSplits()); Random random = new Random(); // point filter + for (int i = 0; i < 10; i++) { - int value = 6000 + random.nextInt(93421 - 6000); + int value = random.nextInt(110000); TableRead read = table.newRead().withFilter(builder.equal(1, value)).executeFilter(); assertThat(getResult(read, splits, BATCH_ROW_TO_STRING)) .isEqualTo( @@ -906,7 +899,7 @@ public void testDeletionVectorsWithParquetFilter() throws Exception { } for (int i = 0; i < 10; i++) { - int value = 93421 + random.nextInt(187795 - 93421); + int value = 130000 + random.nextInt(5000); TableRead read = table.newRead().withFilter(builder.equal(1, value)).executeFilter(); assertThat(getResult(read, splits, BATCH_ROW_TO_STRING)).isEmpty(); } @@ -915,25 +908,29 @@ public void testDeletionVectorsWithParquetFilter() throws Exception { table.newRead() .withFilter( PredicateBuilder.and( - builder.greaterOrEqual(1, 90000), - builder.lessThan(1, 200000))) + builder.greaterOrEqual(1, 100000), + builder.lessThan(1, 150000))) .executeFilter(); List result = getResult(tableRead, splits, BATCH_ROW_TO_STRING); - assertThat(result.size()).isEqualTo((200000 - 90000) - (187795 - 93421)); + assertThat(result.size()).isEqualTo(40000); // filter 10000 assertThat(result) - .doesNotContain("1|93421|9342100|binary|varbinary|mapKey:mapVal|multiset"); + .doesNotContain("1|110000|11000000|binary|varbinary|mapKey:mapVal|multiset"); assertThat(result) - .doesNotContain("1|187794|18779400|binary|varbinary|mapKey:mapVal|multiset"); + .doesNotContain("1|114999|11499900|binary|varbinary|mapKey:mapVal|multiset"); assertThat(result) - .doesNotContain("1|200000|20000000|binary|varbinary|mapKey:mapVal|multiset"); - - assertThat(result).contains("1|199999|19999900|binary|varbinary|mapKey:mapVal|multiset"); - assertThat(result).contains("1|90000|9000000|binary|varbinary|mapKey:mapVal|multiset"); - assertThat(result).contains("1|187795|18779500|binary|varbinary|mapKey:mapVal|multiset"); - assertThat(result).contains("1|93420|9342000|binary|varbinary|mapKey:mapVal|multiset"); + .doesNotContain("1|130000|13000000|binary|varbinary|mapKey:mapVal|multiset"); + assertThat(result) + .doesNotContain("1|134999|13499900|binary|varbinary|mapKey:mapVal|multiset"); + assertThat(result).contains("1|100000|10000000|binary|varbinary|mapKey:mapVal|multiset"); + assertThat(result).contains("1|149999|14999900|binary|varbinary|mapKey:mapVal|multiset"); + + assertThat(result).contains("1|101099|10109900|binary|varbinary|mapKey:mapVal|multiset"); + assertThat(result).contains("1|115000|11500000|binary|varbinary|mapKey:mapVal|multiset"); + assertThat(result).contains("1|129999|12999900|binary|varbinary|mapKey:mapVal|multiset"); + assertThat(result).contains("1|135000|13500000|binary|varbinary|mapKey:mapVal|multiset"); } @Test diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java index 2fa6b3cdb1bf..cd4a1d8c3d87 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java @@ -86,11 +86,6 @@ public long fileSize() { public RoaringBitmap32 selection() { return context.selection(); } - - @Override - public RoaringBitmap32 deletion() { - return context.deletion(); - } }); } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index 64d31aa77eea..dac8940f9078 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -120,8 +120,7 @@ public FileRecordReader createReader(FormatReaderFactory.Context co new ParquetFileReader( ParquetInputFile.fromPath(context.fileIO(), context.filePath()), builder.build(), - context.selection(), - context.deletion()); + context.selection()); MessageType fileSchema = reader.getFileMetaData().getSchema(); MessageType requestedSchema = clipParquetSchema(fileSchema); reader.setRequestedSchema(requestedSchema); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java index 2d30051cde3d..038c91445b1a 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java @@ -81,7 +81,6 @@ public static ParquetFileReader getParquetReader(FileIO fileIO, Path path) throw return new ParquetFileReader( ParquetInputFile.fromPath(fileIO, path), ParquetReadOptions.builder().build(), - null, null); } diff --git a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index e2d00464d91c..e480e1122270 100644 --- a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -221,7 +221,6 @@ private static ParquetMetadata readFooter( private final List blockRowRanges; private final boolean blocksFiltered; @Nullable private final RoaringBitmap32 selection; - @Nullable private final RoaringBitmap32 deletion; // not final. in some cases, this may be lazily loaded for backward-compat. private ParquetMetadata footer; @@ -233,17 +232,13 @@ private static ParquetMetadata readFooter( private InternalFileDecryptor fileDecryptor; public ParquetFileReader( - InputFile file, - ParquetReadOptions options, - @Nullable RoaringBitmap32 selection, - @Nullable RoaringBitmap32 deletion) + InputFile file, ParquetReadOptions options, @Nullable RoaringBitmap32 selection) throws IOException { this.converter = new ParquetMetadataConverter(options); this.file = (ParquetInputFile) file; this.f = this.file.newStream(); this.options = options; this.selection = selection; - this.deletion = deletion; try { this.footer = readFooter(file, options, f, converter); } catch (Exception e) { @@ -374,15 +369,6 @@ private List filterRowGroups(List blocks) throws I it.getRowIndexOffset(), it.getRowIndexOffset() + it.getRowCount())) .collect(Collectors.toList()); - } else if (deletion != null) { - blocks = - blocks.stream() - .filter( - it -> - !deletion.contains( - it.getRowIndexOffset(), - it.getRowIndexOffset() + it.getRowCount())) - .collect(Collectors.toList()); } } @@ -773,8 +759,7 @@ private RowRanges getRowRanges(int blockIndex) { paths.keySet(), blocks.get(blockIndex).getRowCount(), blocks.get(blockIndex).getRowIndexOffset(), - selection, - deletion); + selection); blockRowRanges.set(blockIndex, rowRanges); } return rowRanges; diff --git a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java index f64b40d92e77..d8a06950fa1d 100644 --- a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java +++ b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java @@ -69,7 +69,6 @@ public class ColumnIndexFilter implements Visitor { private final long rowCount; private final long rowIndexOffset; @Nullable private final RoaringBitmap32 selection; - @Nullable private final RoaringBitmap32 deletion; private RowRanges allRows; /** @@ -84,7 +83,6 @@ public class ColumnIndexFilter implements Visitor { * @param rowCount the total number of rows in the row-group * @param rowIndexOffset the offset of the row-group * @param selection the selected position; it will use to filter or narrow the row ranges - * @param deletion the deleted position; it will use to filter the row ranges * @return the ranges of the possible matching row indexes; the returned ranges will contain all * the rows if any of the required offset index is missing */ @@ -94,8 +92,7 @@ public static RowRanges calculateRowRanges( Set paths, long rowCount, long rowIndexOffset, - @Nullable RoaringBitmap32 selection, - @Nullable RoaringBitmap32 deletion) { + @Nullable RoaringBitmap32 selection) { return filter.accept( new FilterCompat.Visitor() { @Override @@ -109,8 +106,7 @@ public RowRanges visit(FilterPredicateCompat filterPredicateCompat) { paths, rowCount, rowIndexOffset, - selection, - deletion)); + selection)); } catch (MissingOffsetIndexException e) { LOGGER.info(e.getMessage()); return RowRanges.createSingle(rowCount); @@ -134,14 +130,12 @@ private ColumnIndexFilter( Set paths, long rowCount, long rowIndexOffset, - @Nullable RoaringBitmap32 selection, - @Nullable RoaringBitmap32 deletion) { + @Nullable RoaringBitmap32 selection) { this.columnIndexStore = columnIndexStore; this.columns = paths; this.rowCount = rowCount; this.rowIndexOffset = rowIndexOffset; this.selection = selection; - this.deletion = deletion; } private RowRanges allRows() { @@ -239,7 +233,7 @@ private RowRanges applyPredicate( return allRows(); } - return RowRanges.create(rowCount, rowIndexOffset, func.apply(ci), oi, selection, deletion); + return RowRanges.create(rowCount, rowIndexOffset, func.apply(ci), oi, selection); } @Override diff --git a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java index 6da21e252849..7bf4043397ed 100644 --- a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java +++ b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java @@ -38,12 +38,12 @@ * column index based filtering. To be used iterate over the matching row indexes to be read from a * row-group, retrieve the count of the matching rows or check overlapping of a row index range. * - *

Note: The class was copied over to support using selected position and deleted position result - * to filter or narrow the {@link RowRanges}. Added a new method {@link RowRanges#create(long, long, - * PrimitiveIterator.OfInt, OffsetIndex, RoaringBitmap32, RoaringBitmap32)} + *

Note: The class was copied over to support using selected position to filter or narrow the + * {@link RowRanges}. Added a new method {@link RowRanges#create(long, long, + * PrimitiveIterator.OfInt, OffsetIndex, RoaringBitmap32)} * * @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Set, long, long, - * RoaringBitmap32, RoaringBitmap32) + * RoaringBitmap32) */ public class RowRanges { @@ -161,33 +161,28 @@ public static RowRanges create( return ranges; } - /** Support using the selected position or the deleted position to filter the row ranges. */ + /** Support using the selected position to filter or narrow the row ranges. */ public static RowRanges create( long rowCount, long rowIndexOffset, PrimitiveIterator.OfInt pageIndexes, OffsetIndex offsetIndex, - @Nullable RoaringBitmap32 selection, - @Nullable RoaringBitmap32 deletion) { + @Nullable RoaringBitmap32 selection) { RowRanges ranges = new RowRanges(); while (pageIndexes.hasNext()) { int pageIndex = pageIndexes.nextInt(); long firstRowIndex = offsetIndex.getFirstRowIndex(pageIndex); long lastRowIndex = offsetIndex.getLastRowIndex(pageIndex, rowCount); - // using selected position or deleted position to filter or narrow the row ranges - long first = rowIndexOffset + firstRowIndex; - long last = rowIndexOffset + lastRowIndex; + // using selected position to filter or narrow the row ranges if (selection != null) { + long first = rowIndexOffset + firstRowIndex; + long last = rowIndexOffset + lastRowIndex; if (!selection.intersects(first, last + 1)) { continue; } firstRowIndex = selection.nextValue((int) first) - rowIndexOffset; lastRowIndex = selection.previousValue((int) (last)) - rowIndexOffset; - } else if (deletion != null) { - if (deletion.contains(first, last + 1)) { - continue; - } } ranges.add(new Range(firstRowIndex, lastRowIndex)); From 9444900b0dfbdd37494441810d90818732335da8 Mon Sep 17 00:00:00 2001 From: tanjialiang Date: Mon, 6 Jan 2025 17:32:32 +0800 Subject: [PATCH 13/17] using LazyField --- .../paimon/format/FormatReaderContext.java | 10 ++++--- .../paimon/format/FormatReaderFactory.java | 3 ++- .../paimon/operation/RawFileSplitRead.java | 26 +++++++++++-------- ...CompactedChangelogFormatReaderFactory.java | 3 ++- .../org/apache/orc/impl/RecordReaderImpl.java | 11 +++++--- .../paimon/format/orc/OrcReaderFactory.java | 5 ++-- .../parquet/hadoop/ParquetFileReader.java | 16 ++++++++---- .../columnindex/ColumnIndexFilter.java | 9 ++++--- .../filter2/columnindex/RowRanges.java | 15 ++++++----- 9 files changed, 60 insertions(+), 38 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java index 053acd99a49c..67f34596cbb3 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java @@ -21,6 +21,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.RoaringBitmap32; import javax.annotation.Nullable; @@ -31,14 +32,17 @@ public class FormatReaderContext implements FormatReaderFactory.Context { private final FileIO fileIO; private final Path file; private final long fileSize; - @Nullable private final RoaringBitmap32 selection; + @Nullable private final LazyField selection; public FormatReaderContext(FileIO fileIO, Path file, long fileSize) { this(fileIO, file, fileSize, null); } public FormatReaderContext( - FileIO fileIO, Path file, long fileSize, @Nullable RoaringBitmap32 selection) { + FileIO fileIO, + Path file, + long fileSize, + @Nullable LazyField selection) { this.fileIO = fileIO; this.file = file; this.fileSize = fileSize; @@ -62,7 +66,7 @@ public long fileSize() { @Nullable @Override - public RoaringBitmap32 selection() { + public LazyField selection() { return selection; } } diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java index 8e64cc63a4c0..4f99d3775e49 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java @@ -23,6 +23,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.reader.FileRecordReader; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.RoaringBitmap32; import javax.annotation.Nullable; @@ -44,6 +45,6 @@ interface Context { long fileSize(); @Nullable - RoaringBitmap32 selection(); + LazyField selection(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index aea35d71980b..7eb90d58a378 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -51,6 +51,7 @@ import org.apache.paimon.utils.FormatReaderMapping; import org.apache.paimon.utils.FormatReaderMapping.Builder; import org.apache.paimon.utils.IOExceptionSupplier; +import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.RoaringBitmap32; import org.slf4j.Logger; @@ -210,20 +211,23 @@ private FileRecordReader createFileReader( } } - RoaringBitmap32 selection = null; - if (fileIndexResult instanceof BitmapIndexResult) { - selection = ((BitmapIndexResult) fileIndexResult).get(); - } + BitmapIndexResult fileIndex = + fileIndexResult instanceof BitmapIndexResult + ? ((BitmapIndexResult) fileIndexResult) + : null; - RoaringBitmap32 deletion = null; DeletionVector deletionVector = dvFactory == null ? null : dvFactory.get(); - if (deletionVector instanceof BitmapDeletionVector) { - deletion = ((BitmapDeletionVector) deletionVector).get(); - } + LazyField deletion = + deletionVector instanceof BitmapDeletionVector + ? new LazyField<>(() -> ((BitmapDeletionVector) deletionVector).get()) + : null; - if (deletion != null && selection != null) { - selection = RoaringBitmap32.andNot(selection, deletion); - if (selection.isEmpty()) { + BitmapIndexResult selection = fileIndex; + if (fileIndex != null && deletion != null) { + selection = + new BitmapIndexResult( + () -> RoaringBitmap32.andNot(fileIndex.get(), deletion.get())); + if (!selection.remain()) { return new EmptyFileRecordReader<>(); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java index cd4a1d8c3d87..e7be3f57bd1a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java @@ -27,6 +27,7 @@ import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.fs.SeekableInputStream; import org.apache.paimon.reader.FileRecordReader; +import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.RoaringBitmap32; import java.io.EOFException; @@ -83,7 +84,7 @@ public long fileSize() { } @Override - public RoaringBitmap32 selection() { + public LazyField selection() { return context.selection(); } }); diff --git a/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java b/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java index 4c821185d7d8..e56760e82d71 100644 --- a/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java +++ b/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java @@ -18,6 +18,7 @@ package org.apache.orc.impl; +import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.RoaringBitmap32; import org.apache.commons.lang3.ArrayUtils; @@ -127,7 +128,7 @@ public class RecordReaderImpl implements RecordReader { private final boolean noSelectedVector; // identifies whether the file has bad bloom filters that we should not use. private final boolean skipBloomFilters; - @Nullable private final RoaringBitmap32 selection; + @Nullable private final LazyField selection; static final String[] BAD_CPP_BLOOM_FILTER_VERSIONS = { "1.6.0", "1.6.1", "1.6.2", "1.6.3", "1.6.4", "1.6.5", "1.6.6", "1.6.7", "1.6.8", "1.6.9", @@ -225,7 +226,9 @@ public static int[] mapSargColumnsToOrcInternalColIdx( } public RecordReaderImpl( - ReaderImpl fileReader, Reader.Options options, @Nullable RoaringBitmap32 selection) + ReaderImpl fileReader, + Reader.Options options, + @Nullable LazyField selection) throws IOException { this.selection = selection; OrcFile.WriterVersion writerVersion = fileReader.getWriterVersion(); @@ -1277,7 +1280,7 @@ public boolean[] pickRowGroups( OrcProto.BloomFilterIndex[] bloomFilterIndices, boolean returnNone, long rowBaseInStripe, - @Nullable RoaringBitmap32 selection) + @Nullable LazyField selection) throws IOException { long rowsInStripe = stripe.getNumberOfRows(); int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride); @@ -1374,7 +1377,7 @@ public boolean[] pickRowGroups( if (selection != null) { long firstRow = rowBaseInStripe + rowIndexStride * rowGroup; long lastRow = Math.min(firstRow + rowIndexStride, firstRow + rowsInStripe); - result[rowGroup] &= selection.intersects(firstRow, lastRow); + result[rowGroup] &= selection.get().intersects(firstRow, lastRow); } hasSelected = hasSelected || result[rowGroup]; hasSkipped = hasSkipped || (!result[rowGroup]); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index fad4c7e36a66..7a1440f79e9b 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -35,6 +35,7 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.IOUtils; +import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Pool; import org.apache.paimon.utils.RoaringBitmap32; @@ -259,7 +260,7 @@ private static RecordReader createRecordReader( org.apache.paimon.fs.Path path, long splitStart, long splitLength, - @Nullable RoaringBitmap32 selection, + @Nullable LazyField selection, boolean deletionVectorsEnabled) throws IOException { org.apache.orc.Reader orcReader = createReader(conf, fileIO, path, selection); @@ -343,7 +344,7 @@ public static org.apache.orc.Reader createReader( org.apache.hadoop.conf.Configuration conf, FileIO fileIO, org.apache.paimon.fs.Path path, - @Nullable RoaringBitmap32 selection) + @Nullable LazyField selection) throws IOException { // open ORC file and create reader org.apache.hadoop.fs.Path hPath = new org.apache.hadoop.fs.Path(path.toUri()); diff --git a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index e480e1122270..c8bb2a185f68 100644 --- a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -22,6 +22,7 @@ import org.apache.paimon.format.parquet.ParquetInputStream; import org.apache.paimon.fs.FileRange; import org.apache.paimon.fs.VectoredReadable; +import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.RoaringBitmap32; import org.apache.hadoop.fs.Path; @@ -220,7 +221,7 @@ private static ParquetMetadata readFooter( private final List blockIndexStores; private final List blockRowRanges; private final boolean blocksFiltered; - @Nullable private final RoaringBitmap32 selection; + @Nullable private final LazyField selection; // not final. in some cases, this may be lazily loaded for backward-compat. private ParquetMetadata footer; @@ -232,7 +233,9 @@ private static ParquetMetadata readFooter( private InternalFileDecryptor fileDecryptor; public ParquetFileReader( - InputFile file, ParquetReadOptions options, @Nullable RoaringBitmap32 selection) + InputFile file, + ParquetReadOptions options, + @Nullable LazyField selection) throws IOException { this.converter = new ParquetMetadataConverter(options); this.file = (ParquetInputFile) file; @@ -365,9 +368,12 @@ private List filterRowGroups(List blocks) throws I blocks.stream() .filter( it -> - selection.intersects( - it.getRowIndexOffset(), - it.getRowIndexOffset() + it.getRowCount())) + selection + .get() + .intersects( + it.getRowIndexOffset(), + it.getRowIndexOffset() + + it.getRowCount())) .collect(Collectors.toList()); } } diff --git a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java index d8a06950fa1d..286591d4e792 100644 --- a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java +++ b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java @@ -18,6 +18,7 @@ package org.apache.parquet.internal.filter2.columnindex; +import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.RoaringBitmap32; import org.apache.parquet.filter2.compat.FilterCompat; @@ -58,7 +59,7 @@ * therefore a {@link MissingOffsetIndexException} will be thrown from any {@code visit} methods if * any of the required offset indexes is missing. * - *

Note: The class was copied over to support using {@link RoaringBitmap32} to filter {@link + *

Note: The class was copied over to support using selected position to filter {@link * RowRanges}. */ public class ColumnIndexFilter implements Visitor { @@ -68,7 +69,7 @@ public class ColumnIndexFilter implements Visitor { private final Set columns; private final long rowCount; private final long rowIndexOffset; - @Nullable private final RoaringBitmap32 selection; + @Nullable private final LazyField selection; private RowRanges allRows; /** @@ -92,7 +93,7 @@ public static RowRanges calculateRowRanges( Set paths, long rowCount, long rowIndexOffset, - @Nullable RoaringBitmap32 selection) { + @Nullable LazyField selection) { return filter.accept( new FilterCompat.Visitor() { @Override @@ -130,7 +131,7 @@ private ColumnIndexFilter( Set paths, long rowCount, long rowIndexOffset, - @Nullable RoaringBitmap32 selection) { + @Nullable LazyField selection) { this.columnIndexStore = columnIndexStore; this.columns = paths; this.rowCount = rowCount; diff --git a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java index 7bf4043397ed..b022c28d3fa7 100644 --- a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java +++ b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java @@ -18,6 +18,7 @@ package org.apache.parquet.internal.filter2.columnindex; +import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.RoaringBitmap32; import org.apache.parquet.filter2.compat.FilterCompat.Filter; @@ -40,10 +41,9 @@ * *

Note: The class was copied over to support using selected position to filter or narrow the * {@link RowRanges}. Added a new method {@link RowRanges#create(long, long, - * PrimitiveIterator.OfInt, OffsetIndex, RoaringBitmap32)} + * PrimitiveIterator.OfInt, OffsetIndex, LazyField)} * - * @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Set, long, long, - * RoaringBitmap32) + * @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Set, long, long, LazyField) */ public class RowRanges { @@ -167,7 +167,7 @@ public static RowRanges create( long rowIndexOffset, PrimitiveIterator.OfInt pageIndexes, OffsetIndex offsetIndex, - @Nullable RoaringBitmap32 selection) { + @Nullable LazyField selection) { RowRanges ranges = new RowRanges(); while (pageIndexes.hasNext()) { int pageIndex = pageIndexes.nextInt(); @@ -178,11 +178,12 @@ public static RowRanges create( if (selection != null) { long first = rowIndexOffset + firstRowIndex; long last = rowIndexOffset + lastRowIndex; - if (!selection.intersects(first, last + 1)) { + RoaringBitmap32 result = selection.get(); + if (!result.intersects(first, last + 1)) { continue; } - firstRowIndex = selection.nextValue((int) first) - rowIndexOffset; - lastRowIndex = selection.previousValue((int) (last)) - rowIndexOffset; + firstRowIndex = result.nextValue((int) first) - rowIndexOffset; + lastRowIndex = result.previousValue((int) (last)) - rowIndexOffset; } ranges.add(new Range(firstRowIndex, lastRowIndex)); From 918a3773e4a5899db9a8c10ff7c94f9fbd6b0e2c Mon Sep 17 00:00:00 2001 From: tanjialiang Date: Mon, 6 Jan 2025 18:00:04 +0800 Subject: [PATCH 14/17] fix --- .../java/org/apache/paimon/operation/RawFileSplitRead.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index 7eb90d58a378..bc82bd7d62d9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -222,12 +222,11 @@ private FileRecordReader createFileReader( ? new LazyField<>(() -> ((BitmapDeletionVector) deletionVector).get()) : null; - BitmapIndexResult selection = fileIndex; + LazyField selection = fileIndex; if (fileIndex != null && deletion != null) { selection = - new BitmapIndexResult( - () -> RoaringBitmap32.andNot(fileIndex.get(), deletion.get())); - if (!selection.remain()) { + new LazyField<>(() -> RoaringBitmap32.andNot(fileIndex.get(), deletion.get())); + if (selection.get().isEmpty()) { return new EmptyFileRecordReader<>(); } } From 7e423ca0b4ab852f2c59b95c04c6910c274d6d96 Mon Sep 17 00:00:00 2001 From: tanjialiang Date: Mon, 6 Jan 2025 18:24:58 +0800 Subject: [PATCH 15/17] Revert "fix" This reverts commit 918a3773e4a5899db9a8c10ff7c94f9fbd6b0e2c. --- .../java/org/apache/paimon/operation/RawFileSplitRead.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index bc82bd7d62d9..7eb90d58a378 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -222,11 +222,12 @@ private FileRecordReader createFileReader( ? new LazyField<>(() -> ((BitmapDeletionVector) deletionVector).get()) : null; - LazyField selection = fileIndex; + BitmapIndexResult selection = fileIndex; if (fileIndex != null && deletion != null) { selection = - new LazyField<>(() -> RoaringBitmap32.andNot(fileIndex.get(), deletion.get())); - if (selection.get().isEmpty()) { + new BitmapIndexResult( + () -> RoaringBitmap32.andNot(fileIndex.get(), deletion.get())); + if (!selection.remain()) { return new EmptyFileRecordReader<>(); } } From 25b18c823d5765e50c3152ca909e6702cfcad71d Mon Sep 17 00:00:00 2001 From: tanjialiang Date: Mon, 6 Jan 2025 18:25:09 +0800 Subject: [PATCH 16/17] Revert "using LazyField" This reverts commit 9444900b0dfbdd37494441810d90818732335da8. --- .../paimon/format/FormatReaderContext.java | 10 +++---- .../paimon/format/FormatReaderFactory.java | 3 +-- .../paimon/operation/RawFileSplitRead.java | 26 ++++++++----------- ...CompactedChangelogFormatReaderFactory.java | 3 +-- .../org/apache/orc/impl/RecordReaderImpl.java | 11 +++----- .../paimon/format/orc/OrcReaderFactory.java | 5 ++-- .../parquet/hadoop/ParquetFileReader.java | 16 ++++-------- .../columnindex/ColumnIndexFilter.java | 9 +++---- .../filter2/columnindex/RowRanges.java | 15 +++++------ 9 files changed, 38 insertions(+), 60 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java index 67f34596cbb3..053acd99a49c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java @@ -21,7 +21,6 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.RoaringBitmap32; import javax.annotation.Nullable; @@ -32,17 +31,14 @@ public class FormatReaderContext implements FormatReaderFactory.Context { private final FileIO fileIO; private final Path file; private final long fileSize; - @Nullable private final LazyField selection; + @Nullable private final RoaringBitmap32 selection; public FormatReaderContext(FileIO fileIO, Path file, long fileSize) { this(fileIO, file, fileSize, null); } public FormatReaderContext( - FileIO fileIO, - Path file, - long fileSize, - @Nullable LazyField selection) { + FileIO fileIO, Path file, long fileSize, @Nullable RoaringBitmap32 selection) { this.fileIO = fileIO; this.file = file; this.fileSize = fileSize; @@ -66,7 +62,7 @@ public long fileSize() { @Nullable @Override - public LazyField selection() { + public RoaringBitmap32 selection() { return selection; } } diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java index 4f99d3775e49..8e64cc63a4c0 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java @@ -23,7 +23,6 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.reader.FileRecordReader; import org.apache.paimon.reader.RecordReader; -import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.RoaringBitmap32; import javax.annotation.Nullable; @@ -45,6 +44,6 @@ interface Context { long fileSize(); @Nullable - LazyField selection(); + RoaringBitmap32 selection(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index 7eb90d58a378..aea35d71980b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -51,7 +51,6 @@ import org.apache.paimon.utils.FormatReaderMapping; import org.apache.paimon.utils.FormatReaderMapping.Builder; import org.apache.paimon.utils.IOExceptionSupplier; -import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.RoaringBitmap32; import org.slf4j.Logger; @@ -211,23 +210,20 @@ private FileRecordReader createFileReader( } } - BitmapIndexResult fileIndex = - fileIndexResult instanceof BitmapIndexResult - ? ((BitmapIndexResult) fileIndexResult) - : null; + RoaringBitmap32 selection = null; + if (fileIndexResult instanceof BitmapIndexResult) { + selection = ((BitmapIndexResult) fileIndexResult).get(); + } + RoaringBitmap32 deletion = null; DeletionVector deletionVector = dvFactory == null ? null : dvFactory.get(); - LazyField deletion = - deletionVector instanceof BitmapDeletionVector - ? new LazyField<>(() -> ((BitmapDeletionVector) deletionVector).get()) - : null; + if (deletionVector instanceof BitmapDeletionVector) { + deletion = ((BitmapDeletionVector) deletionVector).get(); + } - BitmapIndexResult selection = fileIndex; - if (fileIndex != null && deletion != null) { - selection = - new BitmapIndexResult( - () -> RoaringBitmap32.andNot(fileIndex.get(), deletion.get())); - if (!selection.remain()) { + if (deletion != null && selection != null) { + selection = RoaringBitmap32.andNot(selection, deletion); + if (selection.isEmpty()) { return new EmptyFileRecordReader<>(); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java index e7be3f57bd1a..cd4a1d8c3d87 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java @@ -27,7 +27,6 @@ import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.fs.SeekableInputStream; import org.apache.paimon.reader.FileRecordReader; -import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.RoaringBitmap32; import java.io.EOFException; @@ -84,7 +83,7 @@ public long fileSize() { } @Override - public LazyField selection() { + public RoaringBitmap32 selection() { return context.selection(); } }); diff --git a/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java b/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java index e56760e82d71..4c821185d7d8 100644 --- a/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java +++ b/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java @@ -18,7 +18,6 @@ package org.apache.orc.impl; -import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.RoaringBitmap32; import org.apache.commons.lang3.ArrayUtils; @@ -128,7 +127,7 @@ public class RecordReaderImpl implements RecordReader { private final boolean noSelectedVector; // identifies whether the file has bad bloom filters that we should not use. private final boolean skipBloomFilters; - @Nullable private final LazyField selection; + @Nullable private final RoaringBitmap32 selection; static final String[] BAD_CPP_BLOOM_FILTER_VERSIONS = { "1.6.0", "1.6.1", "1.6.2", "1.6.3", "1.6.4", "1.6.5", "1.6.6", "1.6.7", "1.6.8", "1.6.9", @@ -226,9 +225,7 @@ public static int[] mapSargColumnsToOrcInternalColIdx( } public RecordReaderImpl( - ReaderImpl fileReader, - Reader.Options options, - @Nullable LazyField selection) + ReaderImpl fileReader, Reader.Options options, @Nullable RoaringBitmap32 selection) throws IOException { this.selection = selection; OrcFile.WriterVersion writerVersion = fileReader.getWriterVersion(); @@ -1280,7 +1277,7 @@ public boolean[] pickRowGroups( OrcProto.BloomFilterIndex[] bloomFilterIndices, boolean returnNone, long rowBaseInStripe, - @Nullable LazyField selection) + @Nullable RoaringBitmap32 selection) throws IOException { long rowsInStripe = stripe.getNumberOfRows(); int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride); @@ -1377,7 +1374,7 @@ public boolean[] pickRowGroups( if (selection != null) { long firstRow = rowBaseInStripe + rowIndexStride * rowGroup; long lastRow = Math.min(firstRow + rowIndexStride, firstRow + rowsInStripe); - result[rowGroup] &= selection.get().intersects(firstRow, lastRow); + result[rowGroup] &= selection.intersects(firstRow, lastRow); } hasSelected = hasSelected || result[rowGroup]; hasSkipped = hasSkipped || (!result[rowGroup]); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 7a1440f79e9b..fad4c7e36a66 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -35,7 +35,6 @@ import org.apache.paimon.types.DataType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.IOUtils; -import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Pool; import org.apache.paimon.utils.RoaringBitmap32; @@ -260,7 +259,7 @@ private static RecordReader createRecordReader( org.apache.paimon.fs.Path path, long splitStart, long splitLength, - @Nullable LazyField selection, + @Nullable RoaringBitmap32 selection, boolean deletionVectorsEnabled) throws IOException { org.apache.orc.Reader orcReader = createReader(conf, fileIO, path, selection); @@ -344,7 +343,7 @@ public static org.apache.orc.Reader createReader( org.apache.hadoop.conf.Configuration conf, FileIO fileIO, org.apache.paimon.fs.Path path, - @Nullable LazyField selection) + @Nullable RoaringBitmap32 selection) throws IOException { // open ORC file and create reader org.apache.hadoop.fs.Path hPath = new org.apache.hadoop.fs.Path(path.toUri()); diff --git a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index c8bb2a185f68..e480e1122270 100644 --- a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -22,7 +22,6 @@ import org.apache.paimon.format.parquet.ParquetInputStream; import org.apache.paimon.fs.FileRange; import org.apache.paimon.fs.VectoredReadable; -import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.RoaringBitmap32; import org.apache.hadoop.fs.Path; @@ -221,7 +220,7 @@ private static ParquetMetadata readFooter( private final List blockIndexStores; private final List blockRowRanges; private final boolean blocksFiltered; - @Nullable private final LazyField selection; + @Nullable private final RoaringBitmap32 selection; // not final. in some cases, this may be lazily loaded for backward-compat. private ParquetMetadata footer; @@ -233,9 +232,7 @@ private static ParquetMetadata readFooter( private InternalFileDecryptor fileDecryptor; public ParquetFileReader( - InputFile file, - ParquetReadOptions options, - @Nullable LazyField selection) + InputFile file, ParquetReadOptions options, @Nullable RoaringBitmap32 selection) throws IOException { this.converter = new ParquetMetadataConverter(options); this.file = (ParquetInputFile) file; @@ -368,12 +365,9 @@ private List filterRowGroups(List blocks) throws I blocks.stream() .filter( it -> - selection - .get() - .intersects( - it.getRowIndexOffset(), - it.getRowIndexOffset() - + it.getRowCount())) + selection.intersects( + it.getRowIndexOffset(), + it.getRowIndexOffset() + it.getRowCount())) .collect(Collectors.toList()); } } diff --git a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java index 286591d4e792..d8a06950fa1d 100644 --- a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java +++ b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java @@ -18,7 +18,6 @@ package org.apache.parquet.internal.filter2.columnindex; -import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.RoaringBitmap32; import org.apache.parquet.filter2.compat.FilterCompat; @@ -59,7 +58,7 @@ * therefore a {@link MissingOffsetIndexException} will be thrown from any {@code visit} methods if * any of the required offset indexes is missing. * - *

Note: The class was copied over to support using selected position to filter {@link + *

Note: The class was copied over to support using {@link RoaringBitmap32} to filter {@link * RowRanges}. */ public class ColumnIndexFilter implements Visitor { @@ -69,7 +68,7 @@ public class ColumnIndexFilter implements Visitor { private final Set columns; private final long rowCount; private final long rowIndexOffset; - @Nullable private final LazyField selection; + @Nullable private final RoaringBitmap32 selection; private RowRanges allRows; /** @@ -93,7 +92,7 @@ public static RowRanges calculateRowRanges( Set paths, long rowCount, long rowIndexOffset, - @Nullable LazyField selection) { + @Nullable RoaringBitmap32 selection) { return filter.accept( new FilterCompat.Visitor() { @Override @@ -131,7 +130,7 @@ private ColumnIndexFilter( Set paths, long rowCount, long rowIndexOffset, - @Nullable LazyField selection) { + @Nullable RoaringBitmap32 selection) { this.columnIndexStore = columnIndexStore; this.columns = paths; this.rowCount = rowCount; diff --git a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java index b022c28d3fa7..7bf4043397ed 100644 --- a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java +++ b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java @@ -18,7 +18,6 @@ package org.apache.parquet.internal.filter2.columnindex; -import org.apache.paimon.utils.LazyField; import org.apache.paimon.utils.RoaringBitmap32; import org.apache.parquet.filter2.compat.FilterCompat.Filter; @@ -41,9 +40,10 @@ * *

Note: The class was copied over to support using selected position to filter or narrow the * {@link RowRanges}. Added a new method {@link RowRanges#create(long, long, - * PrimitiveIterator.OfInt, OffsetIndex, LazyField)} + * PrimitiveIterator.OfInt, OffsetIndex, RoaringBitmap32)} * - * @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Set, long, long, LazyField) + * @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Set, long, long, + * RoaringBitmap32) */ public class RowRanges { @@ -167,7 +167,7 @@ public static RowRanges create( long rowIndexOffset, PrimitiveIterator.OfInt pageIndexes, OffsetIndex offsetIndex, - @Nullable LazyField selection) { + @Nullable RoaringBitmap32 selection) { RowRanges ranges = new RowRanges(); while (pageIndexes.hasNext()) { int pageIndex = pageIndexes.nextInt(); @@ -178,12 +178,11 @@ public static RowRanges create( if (selection != null) { long first = rowIndexOffset + firstRowIndex; long last = rowIndexOffset + lastRowIndex; - RoaringBitmap32 result = selection.get(); - if (!result.intersects(first, last + 1)) { + if (!selection.intersects(first, last + 1)) { continue; } - firstRowIndex = result.nextValue((int) first) - rowIndexOffset; - lastRowIndex = result.previousValue((int) (last)) - rowIndexOffset; + firstRowIndex = selection.nextValue((int) first) - rowIndexOffset; + lastRowIndex = selection.previousValue((int) (last)) - rowIndexOffset; } ranges.add(new Range(firstRowIndex, lastRowIndex)); From 923daca23f99147e5623de7841966f58719aa110 Mon Sep 17 00:00:00 2001 From: tanjialiang Date: Mon, 6 Jan 2025 19:20:03 +0800 Subject: [PATCH 17/17] fix --- .../java/org/apache/paimon/operation/RawFileSplitRead.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index aea35d71980b..a6690ef66085 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -221,8 +221,10 @@ private FileRecordReader createFileReader( deletion = ((BitmapDeletionVector) deletionVector).get(); } - if (deletion != null && selection != null) { - selection = RoaringBitmap32.andNot(selection, deletion); + if (selection != null) { + if (deletion != null) { + selection = RoaringBitmap32.andNot(selection, deletion); + } if (selection.isEmpty()) { return new EmptyFileRecordReader<>(); }