From 939acbddbfdb9d50675e9b5aeea84f9554e8c0a1 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Tue, 1 Apr 2025 20:38:23 +0800 Subject: [PATCH 1/2] [parquet] Fix that ColumnIndexFilter return empty mistakenly when projection doesn't contain filter fields --- .../table/AppendOnlySimpleTableTest.java | 117 ++++++++++++++++++ .../format/parquet/ParquetFileFormat.java | 6 +- .../format/parquet/ParquetReaderFactory.java | 18 ++- .../filter2/predicate/ParquetFilters.java | 90 ++++++++++++++ .../parquet/ParquetColumnVectorTest.java | 4 +- .../format/parquet/ParquetReadWriteTest.java | 40 ++++-- 6 files changed, 260 insertions(+), 15 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java index 715584cdc3cc..3a9d160b88d5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java @@ -816,6 +816,123 @@ public void testBitmapIndexResultFilterParquetRowRanges() throws Exception { } } + @Test + public void testBitmapIndexResultFilterParquetRowRangesAndProjection() throws Exception { + RowType rowType = + RowType.builder() + .field("id", DataTypes.INT()) + .field("event", DataTypes.INT()) + .field("price", DataTypes.INT()) + .build(); + // in unaware-bucket mode, we split files into splits all the time + FileStoreTable table = + createUnawareBucketFileStoreTable( + rowType, + options -> { + options.set(FILE_FORMAT, FILE_FORMAT_PARQUET); + options.set(WRITE_ONLY, true); + options.set( + FileIndexOptions.FILE_INDEX + + "." + + BitSliceIndexBitmapFileIndexFactory.BSI_INDEX + + "." + + CoreOptions.COLUMNS, + "id,price"); + options.set(ParquetOutputFormat.BLOCK_SIZE, "1048576"); + options.set( + ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100"); + options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, "300"); + }); + + int bound = 300000; + Random random = new Random(); + Map expectedMap = new HashMap<>(); + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + for (int j = 0; j < 1000000; j++) { + int next = random.nextInt(bound); + expectedMap.compute(next, (key, value) -> value == null ? 1 : value + 1); + write.write(GenericRow.of(next, next, next)); + } + commit.commit(0, write.prepareCommit(true, 0)); + write.close(); + commit.close(); + + // test eq && projection + for (int i = 0; i < 10; i++) { + int key = random.nextInt(bound); + Predicate predicate = new PredicateBuilder(rowType).equal(2, key); + TableScan.Plan plan = table.newScan().plan(); + RecordReader reader = + table.newRead() + .withFilter(predicate) + .withReadType(rowType.project(new int[] {2})) + .createReader(plan.splits()); + AtomicInteger cnt = new AtomicInteger(0); + reader.forEachRemaining( + row -> { + cnt.incrementAndGet(); + assertThat(row.getFieldCount()).isEqualTo(1); + assertThat(row.getInt(0)).isEqualTo(key); + }); + assertThat(cnt.get()).isEqualTo(expectedMap.getOrDefault(key, 0)); + reader.close(); + } + + // test eq && projection + for (int i = 0; i < 10; i++) { + int key = random.nextInt(bound); + Predicate predicate = new PredicateBuilder(rowType).equal(0, key); + TableScan.Plan plan = table.newScan().plan(); + RecordReader reader = + table.newRead() + .withFilter(predicate) + .withReadType(rowType.project(new int[] {1})) + .createReader(plan.splits()); + AtomicInteger cnt = new AtomicInteger(0); + reader.forEachRemaining( + row -> { + cnt.incrementAndGet(); + assertThat(row.getFieldCount()).isEqualTo(1); + assertThat(row.getInt(0)).isEqualTo(key); + }); + assertThat(cnt.get()).isEqualTo(expectedMap.getOrDefault(key, 0)); + reader.close(); + } + + // test filter && projection + for (int i = 0; i < 10; i++) { + int max = random.nextInt(bound) + 1; + int min = random.nextInt(max); + Predicate predicate = + PredicateBuilder.and( + new PredicateBuilder(rowType).greaterOrEqual(0, min), + new PredicateBuilder(rowType).lessOrEqual(2, max)); + TableScan.Plan plan = table.newScan().plan(); + RecordReader reader = + table.newRead() + .withFilter(predicate) + .withReadType(rowType.project(new int[] {1})) + .createReader(plan.splits()); + + AtomicInteger cnt = new AtomicInteger(0); + reader.forEachRemaining( + row -> { + cnt.addAndGet(1); + assertThat(row.getFieldCount()).isEqualTo(1); + assertThat(row.getInt(0)).isGreaterThanOrEqualTo(min); + assertThat(row.getInt(0)).isLessThanOrEqualTo(max); + }); + Optional reduce = + expectedMap.entrySet().stream() + .filter(x -> x.getKey() >= min && x.getKey() <= max) + .map(Map.Entry::getValue) + .reduce(Integer::sum); + assertThat(cnt.get()).isEqualTo(reduce.orElse(0)); + reader.close(); + } + } + @Test public void testWithShardAppendTable() throws Exception { FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET, -1)); diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java index 24ba9f300c5f..6129e3c86c1a 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetFileFormat.java @@ -61,7 +61,11 @@ Options getOptions() { public FormatReaderFactory createReaderFactory( RowType projectedRowType, List filters) { return new ParquetReaderFactory( - options, projectedRowType, readBatchSize, ParquetFilters.convert(filters)); + options, + projectedRowType, + readBatchSize, + ParquetFilters.convert(filters), + ParquetFilters.getFilterFields(filters)); } @Override diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index 15db2d113aca..cb529de62764 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -97,13 +97,27 @@ public class ParquetReaderFactory implements FormatReaderFactory { private final Set unknownFieldsIndices = new HashSet<>(); public ParquetReaderFactory( - Options conf, RowType readType, int batchSize, FilterCompat.Filter filter) { + Options conf, + RowType readType, + int batchSize, + FilterCompat.Filter filter, + List filterFields) { this.conf = conf; - this.readFields = readType.getFields().toArray(new DataField[0]); + this.readFields = getReadFields(readType, filterFields); this.batchSize = batchSize; this.filter = filter; } + private DataField[] getReadFields(RowType readType, List filterFields) { + List readFields = new ArrayList<>(readType.getFields()); + for (DataField field : filterFields) { + if (readFields.stream().noneMatch(f -> f.name().equals(field.name()))) { + readFields.add(field); + } + } + return readFields.toArray(new DataField[0]); + } + // TODO: remove this when new reader is stable public FileRecordReader createReaderOld(FormatReaderFactory.Context context) throws IOException { diff --git a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java index cb2e95122773..0a99e9e483b5 100644 --- a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java +++ b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java @@ -27,6 +27,7 @@ import org.apache.paimon.types.BinaryType; import org.apache.paimon.types.BooleanType; import org.apache.paimon.types.CharType; +import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataTypeVisitor; import org.apache.paimon.types.DateType; import org.apache.paimon.types.DecimalType; @@ -48,7 +49,9 @@ import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.io.api.Binary; +import java.util.Collections; import java.util.List; +import java.util.stream.Collectors; /** Convert {@link Predicate} to {@link FilterCompat.Filter}. */ public class ParquetFilters { @@ -76,6 +79,12 @@ public static FilterCompat.Filter convert(List predicates) { return result != null ? FilterCompat.get(result) : FilterCompat.NOOP; } + public static List getFilterFields(List predicates) { + return predicates.stream() + .flatMap(p -> p.visit(FilterFieldsVisitor.INSTANCE).stream()) + .collect(Collectors.toList()); + } + @SuppressWarnings({"unchecked", "rawtypes"}) private static class ConvertFilterToParquet implements FunctionVisitor { @@ -309,4 +318,85 @@ public Operators.Column visit(RowType rowType) { throw new UnsupportedOperationException(); } } + + /** To get fields in filter. */ + private static class FilterFieldsVisitor implements FunctionVisitor> { + + public static final FilterFieldsVisitor INSTANCE = new FilterFieldsVisitor(); + + @Override + public List visitIsNotNull(FieldRef fieldRef) { + return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type())); + } + + @Override + public List visitIsNull(FieldRef fieldRef) { + return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type())); + } + + @Override + public List visitStartsWith(FieldRef fieldRef, Object literal) { + return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type())); + } + + @Override + public List visitEndsWith(FieldRef fieldRef, Object literal) { + return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type())); + } + + @Override + public List visitContains(FieldRef fieldRef, Object literal) { + return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type())); + } + + @Override + public List visitLessThan(FieldRef fieldRef, Object literal) { + return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type())); + } + + @Override + public List visitGreaterOrEqual(FieldRef fieldRef, Object literal) { + return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type())); + } + + @Override + public List visitNotEqual(FieldRef fieldRef, Object literal) { + return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type())); + } + + @Override + public List visitLessOrEqual(FieldRef fieldRef, Object literal) { + return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type())); + } + + @Override + public List visitEqual(FieldRef fieldRef, Object literal) { + return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type())); + } + + @Override + public List visitGreaterThan(FieldRef fieldRef, Object literal) { + return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type())); + } + + @Override + public List visitIn(FieldRef fieldRef, List literals) { + return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type())); + } + + @Override + public List visitNotIn(FieldRef fieldRef, List literals) { + return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type())); + } + + @Override + public List visitAnd(List> children) { + return children.stream().flatMap(List::stream).collect(Collectors.toList()); + } + + @Override + public List visitOr(List> children) { + return children.stream().flatMap(List::stream).collect(Collectors.toList()); + } + } } diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java index ee20945d64a5..b38b45df6a9c 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java @@ -47,6 +47,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -593,7 +594,8 @@ private ColumnarRowIterator createRecordIterator(RowType rowType, List reader = readerFactory.createReader( diff --git a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java index 58fc10cc5174..6b06661852b5 100644 --- a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java +++ b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java @@ -33,6 +33,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.options.Options; +import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.reader.RecordReader; import org.apache.paimon.types.ArrayType; @@ -307,7 +308,8 @@ void testProjection(int rowGroupSize) throws IOException { .fields(fieldTypes, new String[] {"f7", "f2", "f4"}) .build(), 500, - FilterCompat.NOOP); + FilterCompat.NOOP, + Collections.emptyList()); AtomicInteger cnt = new AtomicInteger(0); RecordReader reader = @@ -351,7 +353,8 @@ void testProjectionReadUnknownField(int rowGroupSize) throws IOException { .fields(fieldTypes, new String[] {"f7", "f2", "f4", "f99"}) .build(), 500, - FilterCompat.NOOP); + FilterCompat.NOOP, + Collections.emptyList()); AtomicInteger cnt = new AtomicInteger(0); RecordReader reader = @@ -390,7 +393,8 @@ void testReadRowPosition() throws IOException { new Options(), RowType.builder().fields(fieldTypes, new String[] {"f7"}).build(), batchSize, - FilterCompat.NOOP); + FilterCompat.NOOP, + Collections.emptyList()); AtomicInteger cnt = new AtomicInteger(0); try (RecordReader reader = @@ -432,15 +436,14 @@ void testReadRowPositionWithRandomFilter() throws IOException { new PredicateBuilder( new RowType( Collections.singletonList(new DataField(0, "f4", new IntType())))); - FilterCompat.Filter filter = - ParquetFilters.convert( - PredicateBuilder.splitAnd(builder.greaterThan(0, randomStart))); + List filters = PredicateBuilder.splitAnd(builder.greaterThan(0, randomStart)); ParquetReaderFactory format = new ParquetReaderFactory( new Options(), RowType.builder().fields(fieldTypes, new String[] {"f4"}).build(), batchSize, - filter); + ParquetFilters.convert(filters), + ParquetFilters.getFilterFields(filters)); AtomicBoolean isFirst = new AtomicBoolean(true); try (RecordReader reader = @@ -479,7 +482,11 @@ public void testNestedRead(int rowGroupSize, String writerType) throws Exception } ParquetReaderFactory format = new ParquetReaderFactory( - new Options(), NESTED_ARRAY_MAP_TYPE, 500, FilterCompat.NOOP); + new Options(), + NESTED_ARRAY_MAP_TYPE, + 500, + FilterCompat.NOOP, + Collections.emptyList()); RecordReader reader = format.createReader( new FormatReaderContext( @@ -497,7 +504,12 @@ public void testDecimalWithFixedLengthRead() throws Exception { Path path = createDecimalFile(number, folder, 10); ParquetReaderFactory format = - new ParquetReaderFactory(new Options(), DECIMAL_TYPE, 500, FilterCompat.NOOP); + new ParquetReaderFactory( + new Options(), + DECIMAL_TYPE, + 500, + FilterCompat.NOOP, + Collections.emptyList()); RecordReader reader = format.createReader( new FormatReaderContext( @@ -657,7 +669,12 @@ public void testReadBinaryWrittenByParquet() throws Exception { .build(); ParquetReaderFactory format = - new ParquetReaderFactory(new Options(), paimonRowType, 500, FilterCompat.NOOP); + new ParquetReaderFactory( + new Options(), + paimonRowType, + 500, + FilterCompat.NOOP, + Collections.emptyList()); RecordReader reader = format.createReader( @@ -704,7 +721,8 @@ private Path createTempParquetFileByPaimon( private int testReadingFile(List expected, Path path) throws IOException { ParquetReaderFactory format = - new ParquetReaderFactory(new Options(), ROW_TYPE, 500, FilterCompat.NOOP); + new ParquetReaderFactory( + new Options(), ROW_TYPE, 500, FilterCompat.NOOP, Collections.emptyList()); RecordReader reader = format.createReader( From cbc0e07e0c1f15b5ef7c0502ce963b4d6644d90c Mon Sep 17 00:00:00 2001 From: yuzelin Date: Tue, 1 Apr 2025 20:53:28 +0800 Subject: [PATCH 2/2] fix --- .../parquet/filter2/predicate/ParquetFilters.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java index 0a99e9e483b5..1c10c9de0e4f 100644 --- a/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java +++ b/paimon-format/src/main/java/org/apache/parquet/filter2/predicate/ParquetFilters.java @@ -49,6 +49,8 @@ import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.io.api.Binary; +import javax.annotation.Nullable; + import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -79,10 +81,12 @@ public static FilterCompat.Filter convert(List predicates) { return result != null ? FilterCompat.get(result) : FilterCompat.NOOP; } - public static List getFilterFields(List predicates) { - return predicates.stream() - .flatMap(p -> p.visit(FilterFieldsVisitor.INSTANCE).stream()) - .collect(Collectors.toList()); + public static List getFilterFields(@Nullable List predicates) { + return predicates == null + ? Collections.emptyList() + : predicates.stream() + .flatMap(p -> p.visit(FilterFieldsVisitor.INSTANCE).stream()) + .collect(Collectors.toList()); } @SuppressWarnings({"unchecked", "rawtypes"})