Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@ public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
/** Note: Keep this thread-safe. */
@Override
protected boolean filterByStats(ManifestEntry entry) {
if (filter == null) {
Predicate safeFilter =
simpleStatsEvolutions.toEvolutionSafeStatsFilter(entry.file().schemaId(), filter);
if (safeFilter == null) {
return true;
}

Expand All @@ -91,7 +93,7 @@ protected boolean filterByStats(ManifestEntry entry) {
entry.file().rowCount(),
entry.file().valueStatsCols());

return filter.test(
return safeFilter.test(
entry.file().rowCount(),
stats.minValues(),
stats.maxValues(),
Expand All @@ -110,7 +112,7 @@ private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry
dataFilterMapping.computeIfAbsent(
entry.file().schemaId(),
id ->
simpleStatsEvolutions.tryDevolveFilter(
simpleStatsEvolutions.toEvolutionSafeStatsFilter(
entry.file().schemaId(), filter));

try (FileIndexPredicate predicate =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,11 @@ public KeyValueFileStoreScan(
manifestFileFactory,
scanManifestParallelism);
this.bucketSelectConverter = bucketSelectConverter;
// NOTE: don't add key prefix to field names because fieldKeyStatsConverters is used for
// filter conversion
this.fieldKeyStatsConverters =
new SimpleStatsEvolutions(
sid -> keyValueFieldsExtractor.keyFields(scanTableSchema(sid)),
schema.id());
sid -> scanTableSchema(sid).trimmedPrimaryKeysFields(), schema.id());
this.fieldValueStatsConverters =
new SimpleStatsEvolutions(
sid -> keyValueFieldsExtractor.valueFields(scanTableSchema(sid)),
Expand Down Expand Up @@ -119,21 +120,24 @@ public FileStoreScan enableValueFilter() {
/** Note: Keep this thread-safe. */
@Override
protected boolean filterByStats(ManifestEntry entry) {
DataFileMeta file = entry.file();
if (isValueFilterEnabled() && !filterByValueFilter(entry)) {
return false;
}

if (keyFilter != null) {
SimpleStatsEvolution.Result stats =
fieldKeyStatsConverters
.getOrCreate(file.schemaId())
.evolution(file.keyStats(), file.rowCount(), null);
return keyFilter.test(
file.rowCount(), stats.minValues(), stats.maxValues(), stats.nullCounts());
Predicate safeKeyFilter =
fieldKeyStatsConverters.toEvolutionSafeStatsFilter(
entry.file().schemaId(), keyFilter);
if (safeKeyFilter == null) {
return true;
}

return true;
DataFileMeta file = entry.file();
SimpleStatsEvolution.Result stats =
fieldKeyStatsConverters
.getOrCreate(file.schemaId())
.evolution(file.keyStats(), file.rowCount(), null);
return safeKeyFilter.test(
file.rowCount(), stats.minValues(), stats.maxValues(), stats.nullCounts());
}

@Override
Expand All @@ -156,7 +160,7 @@ private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestE
schemaId2DataFilter.computeIfAbsent(
entry.file().schemaId(),
id ->
fieldValueStatsConverters.tryDevolveFilter(
fieldValueStatsConverters.toEvolutionSafeStatsFilter(
entry.file().schemaId(), valueFilter));
return predicate.evaluate(dataPredicate).remain();
} catch (IOException e) {
Expand Down Expand Up @@ -225,12 +229,19 @@ private boolean filterByValueFilter(ManifestEntry entry) {
return ((FilteredManifestEntry) entry).selected();
}

Predicate safeValueFilter =
fieldValueStatsConverters.toEvolutionSafeStatsFilter(
entry.file().schemaId(), valueFilter);
if (safeValueFilter == null) {
return true;
}

DataFileMeta file = entry.file();
SimpleStatsEvolution.Result result =
fieldValueStatsConverters
.getOrCreate(file.schemaId())
.evolution(file.valueStats(), file.rowCount(), file.valueStatsCols());
return valueFilter.test(
return safeValueFilter.test(
file.rowCount(),
result.minValues(),
result.maxValues(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,16 @@ public CastFieldGetter[] getCastMapping() {
* @param tableFields the table fields
* @param dataFields the underlying data fields
* @param filters the filters
* @param forData true if devolve the filters for filtering data file, otherwise, for filtering
* manifest entry
* @return the data filters
*/
@Nullable
public static List<Predicate> devolveDataFilters(
List<DataField> tableFields, List<DataField> dataFields, List<Predicate> filters) {
public static List<Predicate> devolveFilters(
List<DataField> tableFields,
List<DataField> dataFields,
List<Predicate> filters,
boolean forData) {
if (filters == null) {
return null;
}
Expand All @@ -154,19 +159,36 @@ public static List<Predicate> devolveDataFilters(
String.format("Find no field %s", predicate.fieldName()));
DataField dataField = idToDataFields.get(tableField.id());
if (dataField == null) {
// For example, add field b and filter b, the filter is safe for old file
// meta without field b because the index mapping array can handle null
return forData ? Optional.empty() : Optional.of(predicate);
}

Optional<List<Object>> castedLiterals =
CastExecutors.castLiteralsWithEvolution(
predicate.literals(), predicate.type(), dataField.type());

// unsafe
if (!castedLiterals.isPresent()) {
return Optional.empty();
}

return CastExecutors.castLiteralsWithEvolution(
predicate.literals(), predicate.type(), dataField.type())
.map(
literals ->
new LeafPredicate(
predicate.function(),
dataField.type(),
indexOf(dataField, idToDataFields),
dataField.name(),
literals));
if (forData) {
// For data, the filter will be pushdown to data file, so must use the index
// and literal type of data file
return Optional.of(
new LeafPredicate(
predicate.function(),
dataField.type(),
indexOf(dataField, idToDataFields),
dataField.name(),
castedLiterals.get()));
} else {
// For meta, the index mapping array will map the index the cast the
// literals, so just return self
// In other words, return it if it's safe
return Optional.of(predicate);
}
};

for (Predicate predicate : filters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
package org.apache.paimon.stats;

import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.schema.IndexCastMapping;
import org.apache.paimon.schema.SchemaEvolutionUtil;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;

import javax.annotation.Nullable;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -77,18 +77,25 @@ public SimpleStatsEvolution getOrCreate(long dataSchemaId) {
});
}

/**
* If the file's schema id != current table schema id, convert the filter to evolution safe
* filter or null if can't.
*/
@Nullable
public Predicate tryDevolveFilter(long dataSchemaId, Predicate filter) {
if (tableSchemaId == dataSchemaId) {
public Predicate toEvolutionSafeStatsFilter(long dataSchemaId, @Nullable Predicate filter) {
if (filter == null || dataSchemaId == tableSchemaId) {
return filter;
}

// Filter p1 && p2, if only p1 is safe, we can return only p1 to try best filter and let the
// compute engine to perform p2.
List<Predicate> filters = PredicateBuilder.splitAnd(filter);
List<Predicate> devolved =
Objects.requireNonNull(
SchemaEvolutionUtil.devolveDataFilters(
schemaFields.apply(tableSchemaId),
schemaFields.apply(dataSchemaId),
Collections.singletonList(filter)));
return devolved.isEmpty() ? null : devolved.get(0);
SchemaEvolutionUtil.devolveFilters(
tableDataFields, schemaFields.apply(dataSchemaId), filters, false));

return devolved.isEmpty() ? null : PredicateBuilder.and(devolved);
}

public List<DataField> tableDataFields() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ private List<Predicate> readFilters(
List<Predicate> dataFilters =
tableSchema.id() == dataSchema.id()
? filters
: SchemaEvolutionUtil.devolveDataFilters(
tableSchema.fields(), dataSchema.fields(), filters);
: SchemaEvolutionUtil.devolveFilters(
tableSchema.fields(), dataSchema.fields(), filters, true);

// Skip pushing down partition filters to reader.
return excludePredicateWithFields(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.paimon.table.ExpireChangelogImpl;
import org.apache.paimon.table.ExpireSnapshots;
import org.apache.paimon.table.ExpireSnapshotsImpl;
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ScanMode;
Expand Down Expand Up @@ -126,7 +127,7 @@ private TestFileStore(
valueType.getFields(),
valueType.getFieldCount(),
partitionType.getFieldNames(),
keyType.getFieldNames(),
cleanPrimaryKeys(keyType.getFieldNames()),
Collections.emptyMap(),
null),
false,
Expand All @@ -148,6 +149,12 @@ private TestFileStore(
this.commitIdentifier = 0L;
}

private static List<String> cleanPrimaryKeys(List<String> primaryKeys) {
return primaryKeys.stream()
.map(k -> k.substring(SpecialFields.KEY_FIELD_PREFIX.length()))
.collect(Collectors.toList());
}

private static SchemaManager schemaManager(String root, CoreOptions options) {
return new SchemaManager(FileIOFinder.find(new Path(root)), options.path());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ public void testDevolveDataFilters() {
IsNull.INSTANCE, DataTypes.INT(), 7, "a", Collections.emptyList()));

List<Predicate> filters =
SchemaEvolutionUtil.devolveDataFilters(tableFields2, dataFields, predicates);
assert filters != null;
SchemaEvolutionUtil.devolveFilters(tableFields2, dataFields, predicates, true);
assertThat(filters).isNotNull();
assertThat(filters.size()).isEqualTo(1);

LeafPredicate child1 = (LeafPredicate) filters.get(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,8 @@ public void testTableSplitFilterNormalFields() throws Exception {
(files, schemas) -> {
FileStoreTable table = createFileStoreTable(schemas);

/**
* Filter field "g" in [200, 500] in SCHEMA_FIELDS which is updated from bigint
* to float and will get another file with one data as followed:
*
* <ul>
* <li>2,"400","401",402D,403,toDecimal(404),405F,406D,toDecimal(407),408,409,toBytes("410")
* </ul>
*/
// Filter field "g" in [200, 500] in SCHEMA_FIELDS cannot filter old file 1 and
// file 2 because filter is forbidden
List<Split> splits =
toSplits(
table.newSnapshotReader()
Expand All @@ -139,8 +133,12 @@ public void testTableSplitFilterNormalFields() throws Exception {
List<InternalRow.FieldGetter> fieldGetterList = getFieldGetterList(table);
assertThat(getResult(table.newRead(), splits, fieldGetterList))
.containsExactlyInAnyOrder(
// old file1
"1|100|101|102.0|103|104.00|105.0|106.0|107.00|108|109|110",
// old file2
"2|200|201|202.0|203|204.00|205.0|206.0|207.00|208|209|210",
"2|300|301|302.0|303|304.00|305.0|306.0|307.00|308|309|310",
// normal filtered data
"2|400|401|402.0|403|404.00|405.0|406.0|407.00|408|409|410");
},
getPrimaryKeyNames(),
Expand Down
Loading
Loading