Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -100,6 +102,59 @@ public class CastExecutors {
return IDENTITY_CAST_EXECUTOR;
}

/**
* If a field type is modified, pushing down a filter of it is dangerous. This method tries to
* cast the literals of filter to its original type. It only cast the literals when the CastRule
* is in whitelist. Otherwise, return Optional.empty().
*/
public static Optional<List<Object>> castLiteralsWithEvolution(
List<Object> literals, DataType predicateType, DataType dataType) {
if (predicateType.equalsIgnoreNullable(dataType)) {
return Optional.of(literals);
}

CastRule<?, ?> castRule = INSTANCE.internalResolve(predicateType, dataType);
if (castRule == null) {
return Optional.empty();
}

if (castRule instanceof NumericPrimitiveCastRule) {
// Ignore float literals because pushing down float filter result is unpredictable.
// For example, (double) 0.1F in Java is 0.10000000149011612.

if (predicateType.is(DataTypeFamily.INTEGER_NUMERIC)
&& dataType.is(DataTypeFamily.INTEGER_NUMERIC)) {
// Ignore input scale < output scale because of overflow.
// For example, alter 383 from INT to TINYINT, the query result is (byte) 383 ==
// 127. If we push down filter f = 127, 383 will be filtered out mistakenly.

if (integerScaleLargerThan(predicateType.getTypeRoot(), dataType.getTypeRoot())) {
CastExecutor<Number, Number> castExecutor =
(CastExecutor<Number, Number>) castRule.create(predicateType, dataType);
List<Object> newLiterals = new ArrayList<>(literals.size());
for (Object literal : literals) {
Number literalNumber = (Number) literal;
Number newLiteralNumber = castExecutor.cast(literalNumber);
// Ignore if any literal is overflowed.
if (newLiteralNumber.longValue() != literalNumber.longValue()) {
return Optional.empty();
}
newLiterals.add(newLiteralNumber);
}
return Optional.of(newLiterals);
}
}
}

return Optional.empty();
}

private static boolean integerScaleLargerThan(DataTypeRoot a, DataTypeRoot b) {
return (a == DataTypeRoot.SMALLINT && b == DataTypeRoot.TINYINT)
|| (a == DataTypeRoot.INTEGER && b != DataTypeRoot.BIGINT)
|| a == DataTypeRoot.BIGINT;
}

// Map<Target family or root, Map<Input family or root, rule>>
private final Map<Object, Map<Object, CastRule<?, ?>>> rules = new HashMap<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry
Predicate dataPredicate =
dataFilterMapping.computeIfAbsent(
entry.file().schemaId(),
id -> simpleStatsEvolutions.convertFilter(entry.file().schemaId(), filter));
id ->
simpleStatsEvolutions.tryDevolveFilter(
entry.file().schemaId(), filter));

try (FileIndexPredicate predicate =
new FileIndexPredicate(embeddedIndexBytes, dataRowType)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestE
schemaId2DataFilter.computeIfAbsent(
entry.file().schemaId(),
id ->
fieldValueStatsConverters.convertFilter(
fieldValueStatsConverters.tryDevolveFilter(
entry.file().schemaId(), valueFilter));
return predicate.evaluate(dataPredicate).remain();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,16 +276,18 @@ public static int[][] createDataProjection(
}

/**
* Create predicate list from data fields. We will visit all predicate in filters, reset it's
* field index, name and type, and ignore predicate if the field is not exist.
* When pushing down filters after schema evolution, we should devolve the literals from new
* types (in dataFields) to original types (in tableFields). We will visit all predicate in
* filters, reset its field index, name and type, and ignore predicate if the field is not
* exist.
*
* @param tableFields the table fields
* @param dataFields the underlying data fields
* @param filters the filters
* @return the data filters
*/
@Nullable
public static List<Predicate> createDataFilters(
public static List<Predicate> devolveDataFilters(
List<DataField> tableFields, List<DataField> dataFields, List<Predicate> filters) {
if (filters == null) {
return null;
Expand All @@ -308,29 +310,16 @@ public static List<Predicate> createDataFilters(
return Optional.empty();
}

DataType dataValueType = dataField.type().copy(true);
DataType predicateType = predicate.type().copy(true);
CastExecutor<Object, Object> castExecutor =
dataValueType.equals(predicateType)
? null
: (CastExecutor<Object, Object>)
CastExecutors.resolve(
predicate.type(), dataField.type());
// Convert value from predicate type to underlying data type which may lose
// information, for example, convert double value to int. But it doesn't matter
// because it just for predicate push down and the data will be filtered
// correctly after reading.
List<Object> literals =
predicate.literals().stream()
.map(v -> castExecutor == null ? v : castExecutor.cast(v))
.collect(Collectors.toList());
return Optional.of(
new LeafPredicate(
predicate.function(),
dataField.type(),
indexOf(dataField, idToDataFields),
dataField.name(),
literals));
return CastExecutors.castLiteralsWithEvolution(
predicate.literals(), predicate.type(), dataField.type())
.map(
literals ->
new LeafPredicate(
predicate.function(),
dataField.type(),
indexOf(dataField, idToDataFields),
dataField.name(),
literals));
};

for (Predicate predicate : filters) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,15 +77,18 @@ public SimpleStatsEvolution getOrCreate(long dataSchemaId) {
});
}

public Predicate convertFilter(long dataSchemaId, Predicate filter) {
return tableSchemaId == dataSchemaId
? filter
: Objects.requireNonNull(
SchemaEvolutionUtil.createDataFilters(
schemaFields.apply(tableSchemaId),
schemaFields.apply(dataSchemaId),
Collections.singletonList(filter)))
.get(0);
@Nullable
public Predicate tryDevolveFilter(long dataSchemaId, Predicate filter) {
if (tableSchemaId == dataSchemaId) {
return filter;
}
List<Predicate> devolved =
Objects.requireNonNull(
SchemaEvolutionUtil.devolveDataFilters(
schemaFields.apply(tableSchemaId),
schemaFields.apply(dataSchemaId),
Collections.singletonList(filter)));
return devolved.isEmpty() ? null : devolved.get(0);
}

public List<DataField> tableDataFields() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ private List<Predicate> readFilters(
List<Predicate> dataFilters =
tableSchema.id() == dataSchema.id()
? filters
: SchemaEvolutionUtil.createDataFilters(
: SchemaEvolutionUtil.devolveDataFilters(
tableSchema.fields(), dataSchema.fields(), filters);

// Skip pushing down partition filters to reader.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.predicate.Equal;
import org.apache.paimon.predicate.IsNotNull;
import org.apache.paimon.predicate.IsNull;
import org.apache.paimon.predicate.LeafPredicate;
Expand Down Expand Up @@ -263,7 +262,7 @@ public void testCreateDataProjection() {
}

@Test
public void testCreateDataFilters() {
public void testDevolveDataFilters() {
List<Predicate> predicates = new ArrayList<>();
predicates.add(
new LeafPredicate(
Expand All @@ -278,7 +277,7 @@ public void testCreateDataFilters() {
IsNull.INSTANCE, DataTypes.INT(), 7, "a", Collections.emptyList()));

List<Predicate> filters =
SchemaEvolutionUtil.createDataFilters(tableFields2, dataFields, predicates);
SchemaEvolutionUtil.devolveDataFilters(tableFields2, dataFields, predicates);
assert filters != null;
assertThat(filters.size()).isEqualTo(1);

Expand All @@ -287,27 +286,4 @@ public void testCreateDataFilters() {
assertThat(child1.fieldName()).isEqualTo("b");
assertThat(child1.index()).isEqualTo(1);
}

@Test
public void testColumnTypeFilter() {
// (1, b, int) in data schema is updated to (1, c, double) in table2
List<Predicate> predicates = new ArrayList<>();
predicates.add(
new LeafPredicate(
Equal.INSTANCE,
DataTypes.DOUBLE(),
0,
"c",
Collections.singletonList(1.0D)));
List<Predicate> filters =
SchemaEvolutionUtil.createDataFilters(tableFields2, dataFields, predicates);
assert filters != null;
assertThat(filters.size()).isEqualTo(1);

LeafPredicate child = (LeafPredicate) filters.get(0);
// Validate value 1 with index 1
assertThat(child.test(GenericRow.of(0, 1))).isTrue();
// Validate value 2 with index 1
assertThat(child.test(GenericRow.of(1, 2))).isFalse();
}
}
Loading
Loading