Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,123 @@ public void testBitmapIndexResultFilterParquetRowRanges() throws Exception {
}
}

@Test
public void testBitmapIndexResultFilterParquetRowRangesAndProjection() throws Exception {
RowType rowType =
RowType.builder()
.field("id", DataTypes.INT())
.field("event", DataTypes.INT())
.field("price", DataTypes.INT())
.build();
// in unaware-bucket mode, we split files into splits all the time
FileStoreTable table =
createUnawareBucketFileStoreTable(
rowType,
options -> {
options.set(FILE_FORMAT, FILE_FORMAT_PARQUET);
options.set(WRITE_ONLY, true);
options.set(
FileIndexOptions.FILE_INDEX
+ "."
+ BitSliceIndexBitmapFileIndexFactory.BSI_INDEX
+ "."
+ CoreOptions.COLUMNS,
"id,price");
options.set(ParquetOutputFormat.BLOCK_SIZE, "1048576");
options.set(
ParquetOutputFormat.MIN_ROW_COUNT_FOR_PAGE_SIZE_CHECK, "100");
options.set(ParquetOutputFormat.PAGE_ROW_COUNT_LIMIT, "300");
});

int bound = 300000;
Random random = new Random();
Map<Integer, Integer> expectedMap = new HashMap<>();
StreamTableWrite write = table.newWrite(commitUser);
StreamTableCommit commit = table.newCommit(commitUser);
for (int j = 0; j < 1000000; j++) {
int next = random.nextInt(bound);
expectedMap.compute(next, (key, value) -> value == null ? 1 : value + 1);
write.write(GenericRow.of(next, next, next));
}
commit.commit(0, write.prepareCommit(true, 0));
write.close();
commit.close();

// test eq <price> && projection <price>
for (int i = 0; i < 10; i++) {
int key = random.nextInt(bound);
Predicate predicate = new PredicateBuilder(rowType).equal(2, key);
TableScan.Plan plan = table.newScan().plan();
RecordReader<InternalRow> reader =
table.newRead()
.withFilter(predicate)
.withReadType(rowType.project(new int[] {2}))
.createReader(plan.splits());
AtomicInteger cnt = new AtomicInteger(0);
reader.forEachRemaining(
row -> {
cnt.incrementAndGet();
assertThat(row.getFieldCount()).isEqualTo(1);
assertThat(row.getInt(0)).isEqualTo(key);
});
assertThat(cnt.get()).isEqualTo(expectedMap.getOrDefault(key, 0));
reader.close();
}

// test eq <id> && projection <event>
for (int i = 0; i < 10; i++) {
int key = random.nextInt(bound);
Predicate predicate = new PredicateBuilder(rowType).equal(0, key);
TableScan.Plan plan = table.newScan().plan();
RecordReader<InternalRow> reader =
table.newRead()
.withFilter(predicate)
.withReadType(rowType.project(new int[] {1}))
.createReader(plan.splits());
AtomicInteger cnt = new AtomicInteger(0);
reader.forEachRemaining(
row -> {
cnt.incrementAndGet();
assertThat(row.getFieldCount()).isEqualTo(1);
assertThat(row.getInt(0)).isEqualTo(key);
});
assertThat(cnt.get()).isEqualTo(expectedMap.getOrDefault(key, 0));
reader.close();
}

// test filter <id, price> && projection <event>
for (int i = 0; i < 10; i++) {
int max = random.nextInt(bound) + 1;
int min = random.nextInt(max);
Predicate predicate =
PredicateBuilder.and(
new PredicateBuilder(rowType).greaterOrEqual(0, min),
new PredicateBuilder(rowType).lessOrEqual(2, max));
TableScan.Plan plan = table.newScan().plan();
RecordReader<InternalRow> reader =
table.newRead()
.withFilter(predicate)
.withReadType(rowType.project(new int[] {1}))
.createReader(plan.splits());

AtomicInteger cnt = new AtomicInteger(0);
reader.forEachRemaining(
row -> {
cnt.addAndGet(1);
assertThat(row.getFieldCount()).isEqualTo(1);
assertThat(row.getInt(0)).isGreaterThanOrEqualTo(min);
assertThat(row.getInt(0)).isLessThanOrEqualTo(max);
});
Optional<Integer> reduce =
expectedMap.entrySet().stream()
.filter(x -> x.getKey() >= min && x.getKey() <= max)
.map(Map.Entry::getValue)
.reduce(Integer::sum);
assertThat(cnt.get()).isEqualTo(reduce.orElse(0));
reader.close();
}
}

@Test
public void testWithShardAppendTable() throws Exception {
FileStoreTable table = createFileStoreTable(conf -> conf.set(BUCKET, -1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ Options getOptions() {
public FormatReaderFactory createReaderFactory(
RowType projectedRowType, List<Predicate> filters) {
return new ParquetReaderFactory(
options, projectedRowType, readBatchSize, ParquetFilters.convert(filters));
options,
projectedRowType,
readBatchSize,
ParquetFilters.convert(filters),
ParquetFilters.getFilterFields(filters));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,27 @@ public class ParquetReaderFactory implements FormatReaderFactory {
private final Set<Integer> unknownFieldsIndices = new HashSet<>();

public ParquetReaderFactory(
Options conf, RowType readType, int batchSize, FilterCompat.Filter filter) {
Options conf,
RowType readType,
int batchSize,
FilterCompat.Filter filter,
List<DataField> filterFields) {
this.conf = conf;
this.readFields = readType.getFields().toArray(new DataField[0]);
this.readFields = getReadFields(readType, filterFields);
this.batchSize = batchSize;
this.filter = filter;
}

private DataField[] getReadFields(RowType readType, List<DataField> filterFields) {
List<DataField> readFields = new ArrayList<>(readType.getFields());
for (DataField field : filterFields) {
if (readFields.stream().noneMatch(f -> f.name().equals(field.name()))) {
readFields.add(field);
}
}
return readFields.toArray(new DataField[0]);
}

// TODO: remove this when new reader is stable
public FileRecordReader<InternalRow> createReaderOld(FormatReaderFactory.Context context)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.paimon.types.BinaryType;
import org.apache.paimon.types.BooleanType;
import org.apache.paimon.types.CharType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypeVisitor;
import org.apache.paimon.types.DateType;
import org.apache.paimon.types.DecimalType;
Expand All @@ -48,7 +49,11 @@
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.io.api.Binary;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

/** Convert {@link Predicate} to {@link FilterCompat.Filter}. */
public class ParquetFilters {
Expand Down Expand Up @@ -76,6 +81,14 @@ public static FilterCompat.Filter convert(List<Predicate> predicates) {
return result != null ? FilterCompat.get(result) : FilterCompat.NOOP;
}

public static List<DataField> getFilterFields(@Nullable List<Predicate> predicates) {
return predicates == null
? Collections.emptyList()
: predicates.stream()
.flatMap(p -> p.visit(FilterFieldsVisitor.INSTANCE).stream())
.collect(Collectors.toList());
}

@SuppressWarnings({"unchecked", "rawtypes"})
private static class ConvertFilterToParquet implements FunctionVisitor<FilterPredicate> {

Expand Down Expand Up @@ -309,4 +322,85 @@ public Operators.Column<?> visit(RowType rowType) {
throw new UnsupportedOperationException();
}
}

/** To get fields in filter. */
private static class FilterFieldsVisitor implements FunctionVisitor<List<DataField>> {

public static final FilterFieldsVisitor INSTANCE = new FilterFieldsVisitor();

@Override
public List<DataField> visitIsNotNull(FieldRef fieldRef) {
return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type()));
}

@Override
public List<DataField> visitIsNull(FieldRef fieldRef) {
return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type()));
}

@Override
public List<DataField> visitStartsWith(FieldRef fieldRef, Object literal) {
return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type()));
}

@Override
public List<DataField> visitEndsWith(FieldRef fieldRef, Object literal) {
return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type()));
}

@Override
public List<DataField> visitContains(FieldRef fieldRef, Object literal) {
return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type()));
}

@Override
public List<DataField> visitLessThan(FieldRef fieldRef, Object literal) {
return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type()));
}

@Override
public List<DataField> visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type()));
}

@Override
public List<DataField> visitNotEqual(FieldRef fieldRef, Object literal) {
return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type()));
}

@Override
public List<DataField> visitLessOrEqual(FieldRef fieldRef, Object literal) {
return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type()));
}

@Override
public List<DataField> visitEqual(FieldRef fieldRef, Object literal) {
return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type()));
}

@Override
public List<DataField> visitGreaterThan(FieldRef fieldRef, Object literal) {
return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type()));
}

@Override
public List<DataField> visitIn(FieldRef fieldRef, List<Object> literals) {
return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type()));
}

@Override
public List<DataField> visitNotIn(FieldRef fieldRef, List<Object> literals) {
return Collections.singletonList(new DataField(-1, fieldRef.name(), fieldRef.type()));
}

@Override
public List<DataField> visitAnd(List<List<DataField>> children) {
return children.stream().flatMap(List::stream).collect(Collectors.toList());
}

@Override
public List<DataField> visitOr(List<List<DataField>> children) {
return children.stream().flatMap(List::stream).collect(Collectors.toList());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -593,7 +594,8 @@ private ColumnarRowIterator createRecordIterator(RowType rowType, List<InternalR
writer.close();

ParquetReaderFactory readerFactory =
new ParquetReaderFactory(new Options(), rowType, 1024, FilterCompat.NOOP);
new ParquetReaderFactory(
new Options(), rowType, 1024, FilterCompat.NOOP, Collections.emptyList());

RecordReader<InternalRow> reader =
readerFactory.createReader(
Expand Down
Loading