diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java index cae6a977e615..053acd99a49c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderContext.java @@ -18,10 +18,10 @@ package org.apache.paimon.format; -import org.apache.paimon.fileindex.FileIndexResult; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.utils.RoaringBitmap32; import javax.annotation.Nullable; @@ -31,18 +31,18 @@ public class FormatReaderContext implements FormatReaderFactory.Context { private final FileIO fileIO; private final Path file; private final long fileSize; - @Nullable private final FileIndexResult fileIndexResult; + @Nullable private final RoaringBitmap32 selection; public FormatReaderContext(FileIO fileIO, Path file, long fileSize) { this(fileIO, file, fileSize, null); } public FormatReaderContext( - FileIO fileIO, Path file, long fileSize, @Nullable FileIndexResult fileIndexResult) { + FileIO fileIO, Path file, long fileSize, @Nullable RoaringBitmap32 selection) { this.fileIO = fileIO; this.file = file; this.fileSize = fileSize; - this.fileIndexResult = fileIndexResult; + this.selection = selection; } @Override @@ -62,7 +62,7 @@ public long fileSize() { @Nullable @Override - public FileIndexResult fileIndex() { - return fileIndexResult; + public RoaringBitmap32 selection() { + return selection; } } diff --git a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java index 5ef084ec4d34..8e64cc63a4c0 100644 --- a/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java +++ b/paimon-common/src/main/java/org/apache/paimon/format/FormatReaderFactory.java @@ -19,11 +19,11 @@ package org.apache.paimon.format; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.fileindex.FileIndexResult; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.reader.FileRecordReader; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.utils.RoaringBitmap32; import javax.annotation.Nullable; @@ -44,6 +44,6 @@ interface Context { long fileSize(); @Nullable - FileIndexResult fileIndex(); + RoaringBitmap32 selection(); } } diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java index d3b07a2362a9..b1f58b47d00c 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/RoaringBitmap32.java @@ -68,10 +68,6 @@ public long getCardinality() { return roaringBitmap.getLongCardinality(); } - public long rangeCardinality(long start, long end) { - return roaringBitmap.rangeCardinality(start, end); - } - public int first() { return roaringBitmap.first(); } @@ -80,6 +76,18 @@ public int last() { return roaringBitmap.last(); } + public long nextValue(int fromValue) { + return roaringBitmap.nextValue(fromValue); + } + + public long previousValue(int fromValue) { + return roaringBitmap.previousValue(fromValue); + } + + public boolean intersects(long minimum, long supremum) { + return roaringBitmap.intersects(minimum, supremum); + } + public RoaringBitmap32 clone() { return new RoaringBitmap32(roaringBitmap.clone()); } @@ -142,10 +150,6 @@ public static RoaringBitmap32 bitmapOf(int... dat) { return roaringBitmap32; } - public static RoaringBitmap32 bitmapOfRange(long min, long max) { - return new RoaringBitmap32(RoaringBitmap.bitmapOfRange(min, max)); - } - public static RoaringBitmap32 and(final RoaringBitmap32 x1, final RoaringBitmap32 x2) { return new RoaringBitmap32(RoaringBitmap.and(x1.roaringBitmap, x2.roaringBitmap)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java index 55e0a975e303..37b5fdde123f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java +++ b/paimon-core/src/main/java/org/apache/paimon/deletionvectors/BitmapDeletionVector.java @@ -93,6 +93,15 @@ public byte[] serializeToBytes() { } } + /** + * Note: the result is read only, do not call any modify operation outside. + * + * @return the deleted position + */ + public RoaringBitmap32 get() { + return roaringBitmap; + } + public static DeletionVector deserializeFromByteBuffer(ByteBuffer buffer) throws IOException { RoaringBitmap32 bitmap = new RoaringBitmap32(); bitmap.deserialize(buffer); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index d0f3275b5afc..a6690ef66085 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader; +import org.apache.paimon.deletionvectors.BitmapDeletionVector; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.disk.IOManager; import org.apache.paimon.fileindex.FileIndexResult; @@ -50,6 +51,7 @@ import org.apache.paimon.utils.FormatReaderMapping; import org.apache.paimon.utils.FormatReaderMapping.Builder; import org.apache.paimon.utils.IOExceptionSupplier; +import org.apache.paimon.utils.RoaringBitmap32; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -208,9 +210,29 @@ private FileRecordReader createFileReader( } } + RoaringBitmap32 selection = null; + if (fileIndexResult instanceof BitmapIndexResult) { + selection = ((BitmapIndexResult) fileIndexResult).get(); + } + + RoaringBitmap32 deletion = null; + DeletionVector deletionVector = dvFactory == null ? null : dvFactory.get(); + if (deletionVector instanceof BitmapDeletionVector) { + deletion = ((BitmapDeletionVector) deletionVector).get(); + } + + if (selection != null) { + if (deletion != null) { + selection = RoaringBitmap32.andNot(selection, deletion); + } + if (selection.isEmpty()) { + return new EmptyFileRecordReader<>(); + } + } + FormatReaderContext formatReaderContext = new FormatReaderContext( - fileIO, dataFilePathFactory.toPath(file), file.fileSize(), fileIndexResult); + fileIO, dataFilePathFactory.toPath(file), file.fileSize(), selection); FileRecordReader fileRecordReader = new DataFileRecordReader( formatReaderMapping.getReaderFactory(), @@ -225,7 +247,6 @@ private FileRecordReader createFileReader( fileRecordReader, (BitmapIndexResult) fileIndexResult); } - DeletionVector deletionVector = dvFactory == null ? null : dvFactory.get(); if (deletionVector != null && !deletionVector.isEmpty()) { return new ApplyDeletionVectorReader(fileRecordReader, deletionVector); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index fa635e2ab666..9427ed61c9b9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -31,11 +31,13 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.BundleRecords; import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.reader.RecordReader; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.SchemaUtils; @@ -58,6 +60,7 @@ import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.StreamTableScan; import org.apache.paimon.table.source.TableRead; +import org.apache.paimon.table.source.TableScan; import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.table.system.AuditLogTable; import org.apache.paimon.table.system.FileMonitorTable; @@ -68,6 +71,7 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Pair; +import org.apache.parquet.hadoop.ParquetOutputFormat; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -85,8 +89,10 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; import java.util.function.Consumer; import java.util.function.Function; @@ -100,7 +106,7 @@ import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP; import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED; import static org.apache.paimon.CoreOptions.FILE_FORMAT; -import static org.apache.paimon.CoreOptions.FILE_INDEX_IN_MANIFEST_THRESHOLD; +import static org.apache.paimon.CoreOptions.FILE_FORMAT_PARQUET; import static org.apache.paimon.CoreOptions.LOOKUP_LOCAL_FILE_TYPE; import static org.apache.paimon.CoreOptions.MERGE_ENGINE; import static org.apache.paimon.CoreOptions.MergeEngine; @@ -964,89 +970,111 @@ public void testDeletionVectorsWithFileIndexInMeta() throws Exception { } @Test - public void testDeletionVectorsWithBitmapFileIndexInFile() throws Exception { + public void testDeletionVectorsCombineWithFileIndexPushDownParquet() throws Exception { FileStoreTable table = createFileStoreTable( conf -> { conf.set(BUCKET, 1); + conf.set(FILE_FORMAT, FILE_FORMAT_PARQUET); conf.set(DELETION_VECTORS_ENABLED, true); - conf.set(TARGET_FILE_SIZE, MemorySize.ofBytes(1)); - conf.set(FILE_INDEX_IN_MANIFEST_THRESHOLD, MemorySize.ofBytes(1)); + conf.set(ParquetOutputFormat.BLOCK_SIZE, "524288"); + conf.set(ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100"); + conf.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, "300"); conf.set("file-index.bitmap.columns", "b"); }); - StreamTableWrite write = - table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString())); - StreamTableCommit commit = table.newCommit(commitUser); - - write.write(rowData(1, 1, 300L)); - write.write(rowData(1, 2, 400L)); - write.write(rowData(1, 3, 100L)); - write.write(rowData(1, 4, 100L)); - commit.commit(0, write.prepareCommit(true, 0)); - - write.write(rowData(1, 1, 100L)); - write.write(rowData(1, 2, 100L)); - write.write(rowData(1, 3, 300L)); - write.write(rowData(1, 5, 100L)); - commit.commit(1, write.prepareCommit(true, 1)); - - write.write(rowData(1, 4, 200L)); - commit.commit(2, write.prepareCommit(true, 2)); - - PredicateBuilder builder = new PredicateBuilder(ROW_TYPE); - List splits = toSplits(table.newSnapshotReader().read().dataSplits()); - assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2); - TableRead read = table.newRead().withFilter(builder.equal(2, 100L)); - assertThat(getResult(read, splits, BATCH_ROW_TO_STRING)) - .hasSameElementsAs( - Arrays.asList( - "1|1|100|binary|varbinary|mapKey:mapVal|multiset", - "1|2|100|binary|varbinary|mapKey:mapVal|multiset", - "1|5|100|binary|varbinary|mapKey:mapVal|multiset")); - } - - @Test - public void testDeletionVectorsWithBitmapFileIndexInMeta() throws Exception { - FileStoreTable table = - createFileStoreTable( - conf -> { - conf.set(BUCKET, 1); - conf.set(DELETION_VECTORS_ENABLED, true); - conf.set(TARGET_FILE_SIZE, MemorySize.ofBytes(1)); - conf.set(FILE_INDEX_IN_MANIFEST_THRESHOLD, MemorySize.ofMebiBytes(1)); - conf.set("file-index.bitmap.columns", "b"); - }); + BatchWriteBuilder batchWriteBuilder = table.newBatchWriteBuilder(); + BatchTableWrite batchWrite; + BatchTableCommit batchCommit; - StreamTableWrite write = - table.newWrite(commitUser).withIOManager(new IOManagerImpl(tempDir.toString())); - StreamTableCommit commit = table.newCommit(commitUser); - - write.write(rowData(1, 1, 300L)); - write.write(rowData(1, 2, 400L)); - write.write(rowData(1, 3, 100L)); - write.write(rowData(1, 4, 100L)); - commit.commit(0, write.prepareCommit(true, 0)); - - write.write(rowData(1, 1, 100L)); - write.write(rowData(1, 2, 100L)); - write.write(rowData(1, 3, 300L)); - write.write(rowData(1, 5, 100L)); - commit.commit(1, write.prepareCommit(true, 1)); - - write.write(rowData(1, 4, 200L)); - commit.commit(2, write.prepareCommit(true, 2)); + // test bitmap index and deletion vector merge and using EmptyFileRecordReader to read + batchWrite = + (BatchTableWrite) + batchWriteBuilder + .newWrite() + .withIOManager(new IOManagerImpl(tempDir.toString())); + batchCommit = batchWriteBuilder.newCommit(); + batchWrite.write(rowData(1, 1, 300L)); + batchWrite.write(rowData(1, 2, 400L)); + batchWrite.write(rowData(1, 3, 100L)); + batchWrite.write(rowData(1, 4, 100L)); + batchCommit.commit(batchWrite.prepareCommit()); + batchWrite.close(); + batchCommit.close(); + + batchWrite = + (BatchTableWrite) + batchWriteBuilder + .newWrite() + .withIOManager(new IOManagerImpl(tempDir.toString())); + batchCommit = batchWriteBuilder.newCommit(); + batchWrite.write(rowDataWithKind(RowKind.DELETE, 1, 1, 300L)); + batchCommit.commit(batchWrite.prepareCommit()); + batchWrite.close(); + batchCommit.close(); + + Predicate predicate = new PredicateBuilder(table.rowType()).equal(2, 300L); + TableScan.Plan plan = table.newScan().plan(); + RecordReader reader = + table.newRead().withFilter(predicate).createReader(plan.splits()); + AtomicInteger cnt = new AtomicInteger(0); + reader.forEachRemaining(row -> cnt.incrementAndGet()); + assertThat(cnt.get()).isEqualTo(0); + reader.close(); + + // truncate table + FileStoreCommit truncateCommit = table.store().newCommit(UUID.randomUUID().toString()); + truncateCommit.truncateTable(2); + truncateCommit.close(); + + // test parquet row ranges filtering + int bound = 100; + Random random = new Random(); + Map expectedMap = new HashMap<>(); + batchWrite = + (BatchTableWrite) + batchWriteBuilder + .newWrite() + .withIOManager(new IOManagerImpl(tempDir.toString())); + batchCommit = batchWriteBuilder.newCommit(); + for (int i = 0; i < 100000; i++) { + long next = random.nextInt(bound); + expectedMap.put(i, next); + batchWrite.write(rowData(1, i, next)); + } + batchCommit.commit(batchWrite.prepareCommit()); + batchWrite.close(); + batchCommit.close(); - PredicateBuilder builder = new PredicateBuilder(ROW_TYPE); - List splits = toSplits(table.newSnapshotReader().read().dataSplits()); - assertThat(((DataSplit) splits.get(0)).dataFiles().size()).isEqualTo(2); - TableRead read = table.newRead().withFilter(builder.equal(2, 100L)); - assertThat(getResult(read, splits, BATCH_ROW_TO_STRING)) - .hasSameElementsAs( - Arrays.asList( - "1|1|100|binary|varbinary|mapKey:mapVal|multiset", - "1|2|100|binary|varbinary|mapKey:mapVal|multiset", - "1|5|100|binary|varbinary|mapKey:mapVal|multiset")); + batchWrite = + (BatchTableWrite) + batchWriteBuilder + .newWrite() + .withIOManager(new IOManagerImpl(tempDir.toString())); + batchCommit = batchWriteBuilder.newCommit(); + for (int i = 25000; i < 50000; i++) { + batchWrite.write(rowDataWithKind(RowKind.DELETE, 1, i, expectedMap.remove(i))); + } + batchCommit.commit(batchWrite.prepareCommit()); + batchWrite.close(); + batchCommit.close(); + + for (int i = 0; i < bound; i++) { + long next = i; + predicate = new PredicateBuilder(table.rowType()).equal(2, next); + plan = table.newScan().plan(); + reader = table.newRead().withFilter(predicate).createReader(plan.splits()); + AtomicLong expectedCnt = new AtomicLong(0); + reader.forEachRemaining( + row -> { + expectedCnt.incrementAndGet(); + assertThat(row.getLong(2)).isEqualTo(expectedMap.get(row.getInt(1))); + }); + long count = + expectedMap.entrySet().stream().filter(x -> x.getValue().equals(next)).count(); + assertThat(expectedCnt.get()).isEqualTo(count); + reader.close(); + } } @Test diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java index e0aed448db93..cd4a1d8c3d87 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java @@ -20,7 +20,6 @@ import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.fileindex.FileIndexResult; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; @@ -28,6 +27,7 @@ import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.fs.SeekableInputStream; import org.apache.paimon.reader.FileRecordReader; +import org.apache.paimon.utils.RoaringBitmap32; import java.io.EOFException; import java.io.IOException; @@ -83,8 +83,8 @@ public long fileSize() { } @Override - public FileIndexResult fileIndex() { - return context.fileIndex(); + public RoaringBitmap32 selection() { + return context.selection(); } }); } diff --git a/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java b/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java index 93aa0719caea..4c821185d7d8 100644 --- a/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java +++ b/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java @@ -18,8 +18,6 @@ package org.apache.orc.impl; -import org.apache.paimon.fileindex.FileIndexResult; -import org.apache.paimon.fileindex.bitmap.BitmapIndexResult; import org.apache.paimon.utils.RoaringBitmap32; import org.apache.commons.lang3.ArrayUtils; @@ -129,7 +127,8 @@ public class RecordReaderImpl implements RecordReader { private final boolean noSelectedVector; // identifies whether the file has bad bloom filters that we should not use. private final boolean skipBloomFilters; - @Nullable private final FileIndexResult fileIndexResult; + @Nullable private final RoaringBitmap32 selection; + static final String[] BAD_CPP_BLOOM_FILTER_VERSIONS = { "1.6.0", "1.6.1", "1.6.2", "1.6.3", "1.6.4", "1.6.5", "1.6.6", "1.6.7", "1.6.8", "1.6.9", "1.6.10", "1.6.11", "1.7.0" @@ -226,9 +225,9 @@ public static int[] mapSargColumnsToOrcInternalColIdx( } public RecordReaderImpl( - ReaderImpl fileReader, Reader.Options options, FileIndexResult fileIndexResult) + ReaderImpl fileReader, Reader.Options options, @Nullable RoaringBitmap32 selection) throws IOException { - this.fileIndexResult = fileIndexResult; + this.selection = selection; OrcFile.WriterVersion writerVersion = fileReader.getWriterVersion(); SchemaEvolution evolution; if (options.getSchema() == null) { @@ -1278,7 +1277,7 @@ public boolean[] pickRowGroups( OrcProto.BloomFilterIndex[] bloomFilterIndices, boolean returnNone, long rowBaseInStripe, - FileIndexResult fileIndexResult) + @Nullable RoaringBitmap32 selection) throws IOException { long rowsInStripe = stripe.getNumberOfRows(); int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride); @@ -1289,10 +1288,6 @@ public boolean[] pickRowGroups( boolean hasSkipped = false; SearchArgument.TruthValue[] exceptionAnswer = new SearchArgument.TruthValue[leafValues.length]; - RoaringBitmap32 bitmap = null; - if (fileIndexResult instanceof BitmapIndexResult) { - bitmap = ((BitmapIndexResult) fileIndexResult).get(); - } for (int rowGroup = 0; rowGroup < result.length; ++rowGroup) { for (int pred = 0; pred < leafValues.length; ++pred) { int columnIx = filterColumns[pred]; @@ -1376,10 +1371,10 @@ public boolean[] pickRowGroups( } } result[rowGroup] = sarg.evaluate(leafValues).isNeeded(); - if (bitmap != null) { + if (selection != null) { long firstRow = rowBaseInStripe + rowIndexStride * rowGroup; long lastRow = Math.min(firstRow + rowIndexStride, firstRow + rowsInStripe); - result[rowGroup] &= bitmap.rangeCardinality(firstRow, lastRow) > 0; + result[rowGroup] &= selection.intersects(firstRow, lastRow); } hasSelected = hasSelected || result[rowGroup]; hasSkipped = hasSkipped || (!result[rowGroup]); @@ -1435,7 +1430,7 @@ protected boolean[] pickRowGroups() throws IOException { skipBloomFilters ? null : indexes.getBloomFilterIndex(), false, rowBaseInStripe, - fileIndexResult); + selection); } private void clearStreams() { diff --git a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java index 6683b357fd57..fad4c7e36a66 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java @@ -24,8 +24,6 @@ import org.apache.paimon.data.columnar.ColumnarRowIterator; import org.apache.paimon.data.columnar.VectorizedColumnBatch; import org.apache.paimon.data.columnar.VectorizedRowIterator; -import org.apache.paimon.fileindex.FileIndexResult; -import org.apache.paimon.fileindex.bitmap.BitmapIndexResult; import org.apache.paimon.format.FormatReaderFactory; import org.apache.paimon.format.OrcFormatReaderContext; import org.apache.paimon.format.fs.HadoopReadOnlyFileSystem; @@ -39,6 +37,7 @@ import org.apache.paimon.utils.IOUtils; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Pool; +import org.apache.paimon.utils.RoaringBitmap32; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; @@ -110,7 +109,7 @@ public OrcVectorizedReader createReader(FormatReaderFactory.Context context) context.filePath(), 0, context.fileSize(), - context.fileIndex(), + context.selection(), deletionVectorsEnabled); return new OrcVectorizedReader(orcReader, poolOfBatches); } @@ -260,10 +259,10 @@ private static RecordReader createRecordReader( org.apache.paimon.fs.Path path, long splitStart, long splitLength, - @Nullable FileIndexResult fileIndexResult, + @Nullable RoaringBitmap32 selection, boolean deletionVectorsEnabled) throws IOException { - org.apache.orc.Reader orcReader = createReader(conf, fileIO, path, fileIndexResult); + org.apache.orc.Reader orcReader = createReader(conf, fileIO, path, selection); try { // get offset and length for the stripes that start in the split Pair offsetAndLength = @@ -278,9 +277,7 @@ private static RecordReader createRecordReader( .skipCorruptRecords(OrcConf.SKIP_CORRUPT_DATA.getBoolean(conf)) .tolerateMissingSchema( OrcConf.TOLERATE_MISSING_SCHEMA.getBoolean(conf)); - if (!conjunctPredicates.isEmpty() - && !deletionVectorsEnabled - && !(fileIndexResult instanceof BitmapIndexResult)) { + if (!conjunctPredicates.isEmpty() && !deletionVectorsEnabled && selection == null) { // row group filter push down will make row number change incorrect // so deletion vectors mode and bitmap index cannot work with row group push down options.useSelected(OrcConf.READER_USE_SELECTED.getBoolean(conf)); @@ -346,7 +343,7 @@ public static org.apache.orc.Reader createReader( org.apache.hadoop.conf.Configuration conf, FileIO fileIO, org.apache.paimon.fs.Path path, - @Nullable FileIndexResult fileIndexResult) + @Nullable RoaringBitmap32 selection) throws IOException { // open ORC file and create reader org.apache.hadoop.fs.Path hPath = new org.apache.hadoop.fs.Path(path.toUri()); @@ -359,7 +356,7 @@ public static org.apache.orc.Reader createReader( return new ReaderImpl(hPath, readerOptions) { @Override public RecordReader rows(Options options) throws IOException { - return new RecordReaderImpl(this, options, fileIndexResult); + return new RecordReaderImpl(this, options, selection); } }; } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index 9fef7563718c..dac8940f9078 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -120,7 +120,7 @@ public FileRecordReader createReader(FormatReaderFactory.Context co new ParquetFileReader( ParquetInputFile.fromPath(context.fileIO(), context.filePath()), builder.build(), - context.fileIndex()); + context.selection()); MessageType fileSchema = reader.getFileMetaData().getSchema(); MessageType requestedSchema = clipParquetSchema(fileSchema); reader.setRequestedSchema(requestedSchema); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java index 82d19e448878..038c91445b1a 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetUtil.java @@ -18,7 +18,6 @@ package org.apache.paimon.format.parquet; -import org.apache.paimon.fileindex.FileIndexResult; import org.apache.paimon.format.SimpleStatsExtractor; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; @@ -49,7 +48,7 @@ public class ParquetUtil { */ public static Pair>, SimpleStatsExtractor.FileInfo> extractColumnStats(FileIO fileIO, Path path) throws IOException { - try (ParquetFileReader reader = getParquetReader(fileIO, path, null)) { + try (ParquetFileReader reader = getParquetReader(fileIO, path)) { ParquetMetadata parquetMetadata = reader.getFooter(); List blockMetaDataList = parquetMetadata.getBlocks(); Map> resultStats = new HashMap<>(); @@ -78,12 +77,11 @@ public class ParquetUtil { * @param path the path of parquet files to be read * @return parquet reader, used for reading footer, status, etc. */ - public static ParquetFileReader getParquetReader( - FileIO fileIO, Path path, FileIndexResult fileIndexResult) throws IOException { + public static ParquetFileReader getParquetReader(FileIO fileIO, Path path) throws IOException { return new ParquetFileReader( ParquetInputFile.fromPath(fileIO, path), ParquetReadOptions.builder().build(), - fileIndexResult); + null); } static void assertStatsClass( diff --git a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index c0a28cdc9bb9..e480e1122270 100644 --- a/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/paimon-format/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -18,8 +18,6 @@ package org.apache.parquet.hadoop; -import org.apache.paimon.fileindex.FileIndexResult; -import org.apache.paimon.fileindex.bitmap.BitmapIndexResult; import org.apache.paimon.format.parquet.ParquetInputFile; import org.apache.paimon.format.parquet.ParquetInputStream; import org.apache.paimon.fs.FileRange; @@ -80,6 +78,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.Closeable; import java.io.IOException; import java.io.InputStream; @@ -220,6 +220,7 @@ private static ParquetMetadata readFooter( private final List blockIndexStores; private final List blockRowRanges; private final boolean blocksFiltered; + @Nullable private final RoaringBitmap32 selection; // not final. in some cases, this may be lazily loaded for backward-compat. private ParquetMetadata footer; @@ -228,17 +229,16 @@ private static ParquetMetadata readFooter( private ColumnChunkPageReadStore currentRowGroup = null; private DictionaryPageReader nextDictionaryReader = null; - private InternalFileDecryptor fileDecryptor = null; - private FileIndexResult fileIndexResult; + private InternalFileDecryptor fileDecryptor; public ParquetFileReader( - InputFile file, ParquetReadOptions options, FileIndexResult fileIndexResult) + InputFile file, ParquetReadOptions options, @Nullable RoaringBitmap32 selection) throws IOException { this.converter = new ParquetMetadataConverter(options); this.file = (ParquetInputFile) file; - this.fileIndexResult = fileIndexResult; this.f = this.file.newStream(); this.options = options; + this.selection = selection; try { this.footer = readFooter(file, options, f, converter); } catch (Exception e) { @@ -359,18 +359,15 @@ private List filterRowGroups(List blocks) throws I } blocks = RowGroupFilter.filterRowGroups(levels, recordFilter, blocks, this); } - if (fileIndexResult instanceof BitmapIndexResult) { - RoaringBitmap32 bitmap = ((BitmapIndexResult) fileIndexResult).get(); + + if (selection != null) { blocks = blocks.stream() .filter( - it -> { - long rowIndexOffset = it.getRowIndexOffset(); - return bitmap.rangeCardinality( - rowIndexOffset, - rowIndexOffset + it.getRowCount()) - > 0; - }) + it -> + selection.intersects( + it.getRowIndexOffset(), + it.getRowIndexOffset() + it.getRowCount())) .collect(Collectors.toList()); } } @@ -762,7 +759,7 @@ private RowRanges getRowRanges(int blockIndex) { paths.keySet(), blocks.get(blockIndex).getRowCount(), blocks.get(blockIndex).getRowIndexOffset(), - fileIndexResult); + selection); blockRowRanges.set(blockIndex, rowRanges); } return rowRanges; diff --git a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java index db21a8961a04..d8a06950fa1d 100644 --- a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java +++ b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/ColumnIndexFilter.java @@ -18,7 +18,7 @@ package org.apache.parquet.internal.filter2.columnindex; -import org.apache.paimon.fileindex.FileIndexResult; +import org.apache.paimon.utils.RoaringBitmap32; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.compat.FilterCompat.FilterPredicateCompat; @@ -58,7 +58,7 @@ * therefore a {@link MissingOffsetIndexException} will be thrown from any {@code visit} methods if * any of the required offset indexes is missing. * - *

Note: The class was copied over to support using {@link FileIndexResult} to filter {@link + *

Note: The class was copied over to support using {@link RoaringBitmap32} to filter {@link * RowRanges}. */ public class ColumnIndexFilter implements Visitor { @@ -68,7 +68,7 @@ public class ColumnIndexFilter implements Visitor { private final Set columns; private final long rowCount; private final long rowIndexOffset; - @Nullable private final FileIndexResult fileIndexResult; + @Nullable private final RoaringBitmap32 selection; private RowRanges allRows; /** @@ -80,8 +80,9 @@ public class ColumnIndexFilter implements Visitor { * @param paths the paths of the columns used in the actual projection; a column not being part * of the projection will be handled as containing {@code null} values only even if the * column has values written in the file - * @param fileIndexResult the file index result; it will use to filter row ranges * @param rowCount the total number of rows in the row-group + * @param rowIndexOffset the offset of the row-group + * @param selection the selected position; it will use to filter or narrow the row ranges * @return the ranges of the possible matching row indexes; the returned ranges will contain all * the rows if any of the required offset index is missing */ @@ -91,7 +92,7 @@ public static RowRanges calculateRowRanges( Set paths, long rowCount, long rowIndexOffset, - @Nullable FileIndexResult fileIndexResult) { + @Nullable RoaringBitmap32 selection) { return filter.accept( new FilterCompat.Visitor() { @Override @@ -105,7 +106,7 @@ public RowRanges visit(FilterPredicateCompat filterPredicateCompat) { paths, rowCount, rowIndexOffset, - fileIndexResult)); + selection)); } catch (MissingOffsetIndexException e) { LOGGER.info(e.getMessage()); return RowRanges.createSingle(rowCount); @@ -129,12 +130,12 @@ private ColumnIndexFilter( Set paths, long rowCount, long rowIndexOffset, - @Nullable FileIndexResult fileIndexResult) { + @Nullable RoaringBitmap32 selection) { this.columnIndexStore = columnIndexStore; this.columns = paths; this.rowCount = rowCount; this.rowIndexOffset = rowIndexOffset; - this.fileIndexResult = fileIndexResult; + this.selection = selection; } private RowRanges allRows() { @@ -232,7 +233,7 @@ private RowRanges applyPredicate( return allRows(); } - return RowRanges.create(rowCount, rowIndexOffset, func.apply(ci), oi, fileIndexResult); + return RowRanges.create(rowCount, rowIndexOffset, func.apply(ci), oi, selection); } @Override diff --git a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java index b199f93958df..7bf4043397ed 100644 --- a/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java +++ b/paimon-format/src/main/java/org/apache/parquet/internal/filter2/columnindex/RowRanges.java @@ -18,8 +18,6 @@ package org.apache.parquet.internal.filter2.columnindex; -import org.apache.paimon.fileindex.FileIndexResult; -import org.apache.paimon.fileindex.bitmap.BitmapIndexResult; import org.apache.paimon.utils.RoaringBitmap32; import org.apache.parquet.filter2.compat.FilterCompat.Filter; @@ -40,12 +38,12 @@ * column index based filtering. To be used iterate over the matching row indexes to be read from a * row-group, retrieve the count of the matching rows or check overlapping of a row index range. * - *

Note: The class was copied over to support using {@link FileIndexResult} to filter {@link - * RowRanges}. Added a new method {@link RowRanges#create(long, long, PrimitiveIterator.OfInt, - * OffsetIndex, FileIndexResult)} + *

Note: The class was copied over to support using selected position to filter or narrow the + * {@link RowRanges}. Added a new method {@link RowRanges#create(long, long, + * PrimitiveIterator.OfInt, OffsetIndex, RoaringBitmap32)} * * @see ColumnIndexFilter#calculateRowRanges(Filter, ColumnIndexStore, Set, long, long, - * FileIndexResult) + * RoaringBitmap32) */ public class RowRanges { @@ -163,31 +161,28 @@ public static RowRanges create( return ranges; } - /** Support using {@link FileIndexResult} to filter the row ranges. */ + /** Support using the selected position to filter or narrow the row ranges. */ public static RowRanges create( long rowCount, long rowIndexOffset, PrimitiveIterator.OfInt pageIndexes, OffsetIndex offsetIndex, - @Nullable FileIndexResult fileIndexResult) { + @Nullable RoaringBitmap32 selection) { RowRanges ranges = new RowRanges(); while (pageIndexes.hasNext()) { int pageIndex = pageIndexes.nextInt(); long firstRowIndex = offsetIndex.getFirstRowIndex(pageIndex); long lastRowIndex = offsetIndex.getLastRowIndex(pageIndex, rowCount); - // using file index result to filter or narrow the row ranges - if (fileIndexResult instanceof BitmapIndexResult) { - RoaringBitmap32 bitmap = ((BitmapIndexResult) fileIndexResult).get(); - RoaringBitmap32 range = - RoaringBitmap32.bitmapOfRange( - rowIndexOffset + firstRowIndex, rowIndexOffset + lastRowIndex + 1); - RoaringBitmap32 result = RoaringBitmap32.and(bitmap, range); - if (result.isEmpty()) { + // using selected position to filter or narrow the row ranges + if (selection != null) { + long first = rowIndexOffset + firstRowIndex; + long last = rowIndexOffset + lastRowIndex; + if (!selection.intersects(first, last + 1)) { continue; } - firstRowIndex = result.first() - rowIndexOffset; - lastRowIndex = result.last() - rowIndexOffset; + firstRowIndex = selection.nextValue((int) first) - rowIndexOffset; + lastRowIndex = selection.previousValue((int) (last)) - rowIndexOffset; } ranges.add(new Range(firstRowIndex, lastRowIndex)); diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java index e0d1d240a9fd..221d524fff5c 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetFormatReadWriteTest.java @@ -75,7 +75,7 @@ public void testEnableBloomFilter(boolean enabled) throws Exception { writer.close(); out.close(); - try (ParquetFileReader reader = ParquetUtil.getParquetReader(fileIO, file, null)) { + try (ParquetFileReader reader = ParquetUtil.getParquetReader(fileIO, file)) { ParquetMetadata parquetMetadata = reader.getFooter(); List blockMetaDataList = parquetMetadata.getBlocks(); for (BlockMetaData blockMetaData : blockMetaDataList) {