From d687110f35bf79476b9dd33138aad86abe970cfd Mon Sep 17 00:00:00 2001 From: yuzelin Date: Mon, 16 Dec 2024 12:19:49 +0800 Subject: [PATCH 1/5] [core] Fix numeric literals cast in filter pushdown after schema evolution --- .../apache/paimon/casting/CastExecutors.java | 156 +++++++++++++ .../operation/AppendOnlyFileStoreScan.java | 4 +- .../operation/KeyValueFileStoreScan.java | 2 +- .../paimon/schema/SchemaEvolutionUtil.java | 44 ++-- .../paimon/stats/SimpleStatsEvolutions.java | 21 +- .../paimon/utils/FormatReaderMapping.java | 2 +- .../schema/SchemaEvolutionUtilTest.java | 26 ++- .../FilterPushdownWithSchemaChangeITCase.java | 206 ++++++++++++++++++ 8 files changed, 412 insertions(+), 49 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java b/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java index 8134e0118bf8..e9daef26d276 100644 --- a/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java +++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java @@ -18,17 +18,32 @@ package org.apache.paimon.casting; +import org.apache.paimon.data.Decimal; +import org.apache.paimon.predicate.Equal; +import org.apache.paimon.predicate.GreaterOrEqual; +import org.apache.paimon.predicate.GreaterThan; +import org.apache.paimon.predicate.In; +import org.apache.paimon.predicate.LeafFunction; +import org.apache.paimon.predicate.LeafPredicate; +import org.apache.paimon.predicate.LessOrEqual; +import org.apache.paimon.predicate.LessThan; +import org.apache.paimon.predicate.NotEqual; +import org.apache.paimon.predicate.NotIn; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeFamily; import org.apache.paimon.types.DataTypeRoot; +import org.apache.paimon.types.DecimalType; import javax.annotation.Nullable; +import java.math.BigDecimal; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; import java.util.stream.Stream; /** Cast executors for input type and output type. */ @@ -100,6 +115,147 @@ public class CastExecutors { return IDENTITY_CAST_EXECUTOR; } + /** + * When filter a field witch was evolved from/to a numeric type, we should carefully handle the + * precision match and overflow problem. + */ + @Nullable + public static List safelyCastLiteralsWithNumericEvolution( + LeafPredicate predicate, DataType outputType) { + DataType inputType = predicate.type(); + if (inputType.equalsIgnoreNullable(outputType)) { + return predicate.literals(); + } + + List literals = predicate.literals(); + + CastRule castRule = INSTANCE.internalResolve(inputType, outputType); + if (castRule == null) { + return literals; + } + + if (castRule instanceof DecimalToDecimalCastRule) { + if (((DecimalType) inputType).getPrecision() < ((DecimalType) outputType).getPrecision() + && containsEqualCheck(predicate)) { + // For example, alter 111.321 from DECIMAL(6, 3) to DECIMAL(5, 2). + // The query result is 111.32 which is truncated from 111.321. + // If we query with filter f = 111.32 and push down it, 111.321 will be filtered + // out mistakenly. + // But if we query with filter f > 111.32, although 111.321 will be retrieved, + // the engine will filter out it finally. + return null; + } + // Pushing down higher precision filter is always correct. + return literals; + } else if (castRule instanceof NumericPrimitiveToDecimalCastRule) { + if (inputType.is(DataTypeFamily.INTEGER_NUMERIC) && containsEqualCheck(predicate)) { + // the reason is same as DecimalToDecimalCastRule + return null; + } + return literals.stream() + .map(literal -> (Number) literal) + .map( + literal -> + inputType.is(DataTypeFamily.INTEGER_NUMERIC) + ? BigDecimal.valueOf(literal.longValue()) + : BigDecimal.valueOf(literal.doubleValue())) + .map(bd -> Decimal.fromBigDecimal(bd, bd.precision(), bd.scale())) + .collect(Collectors.toList()); + } else if (castRule instanceof DecimalToNumericPrimitiveCastRule) { + if (outputType.is(DataTypeFamily.INTEGER_NUMERIC) + && (containsPartialCheck(predicate) || containsNotEqualCheck(predicate))) { + // For example, alter 111 from INT to DECIMAL(5, 2). The query result is 111.00 + // If we query with filter f < 111.01 and push down it as f < 111, 111 will be + // filtered out mistakenly. Also, we shouldn't push down f <> 111.01. + // But if we query with filter f = 111.01 and push down it as f = 111, although 111 + // will be retrieved, the engine will filter out it finally. + // TODO: maybe we can scale the partial filter. For example, f < 111.01 can be + // transfer to f < 112. + return null; + } else if (outputType.is(DataTypeFamily.APPROXIMATE_NUMERIC) + && containsEqualCheck(predicate)) { + // For example, alter 111.321 from DOUBLE to DECIMAL(5, 2). The query result is + // 111.32. + // If we query with filter f = 111.32 and push down it, 111.321 will be filtered + // out mistakenly. + // But if we query with filter f > 111.32 or f <> 111.32, although 111.321 will be + // retrieved, the engine will filter out it finally. + return null; + } + castLiterals(castRule, inputType, outputType, literals); + } else if (castRule instanceof NumericPrimitiveCastRule) { + if (inputType.is(DataTypeFamily.INTEGER_NUMERIC) + && outputType.is(DataTypeFamily.INTEGER_NUMERIC)) { + if (integerScaleLargerThan(inputType.getTypeRoot(), outputType.getTypeRoot())) { + // Pushing down higher scale integer numeric filter is always correct. + return literals; + } + } + + // Pushing down float filter is dangerous because the filter result is unpredictable. + // For example, (double) 0.1F in Java is 0.10000000149011612. + + // Pushing down lower scale filter is also dangerous because of overflow. + // For example, alter 383 from INT to TINYINT, the query result is (byte) 383 == 127. + // If we push down filter f = 127, 383 will be filtered out which is wrong. + + // So we don't push down these filters. + return null; + } else if (castRule instanceof NumericToStringCastRule + || castRule instanceof StringToDecimalCastRule + || castRule instanceof StringToNumericPrimitiveCastRule) { + // Pushing down filters related to STRING is dangerous because string comparison is + // different from number comparison and string literal to number might have precision + // and overflow problem. + // For example, alter '111' from STRING to INT, the query result is 111. + // If we query with filter f > 2 and push down it as f > '2', '111' will be filtered + // out mistakenly. + return null; + } + + // Non numeric related cast rule + return castLiterals(castRule, inputType, outputType, literals); + } + + private static List castLiterals( + CastRule castRule, + DataType inputType, + DataType outputType, + List literals) { + CastExecutor castExecutor = + (CastExecutor) castRule.create(inputType, outputType); + return literals.stream() + .map(l -> castExecutor == null ? l : castExecutor.cast(l)) + .collect(Collectors.toList()); + } + + private static boolean containsEqualCheck(LeafPredicate predicate) { + LeafFunction function = predicate.function(); + return function instanceof In + || function instanceof Equal + || function instanceof GreaterOrEqual + || function instanceof LessOrEqual; + } + + private static boolean containsPartialCheck(LeafPredicate predicate) { + LeafFunction function = predicate.function(); + return function instanceof LessThan + || function instanceof LessOrEqual + || function instanceof GreaterThan + || function instanceof GreaterOrEqual; + } + + private static boolean containsNotEqualCheck(LeafPredicate predicate) { + LeafFunction function = predicate.function(); + return function instanceof NotIn || function instanceof NotEqual; + } + + private static boolean integerScaleLargerThan(DataTypeRoot a, DataTypeRoot b) { + return (a == DataTypeRoot.SMALLINT && b == DataTypeRoot.TINYINT) + || (a == DataTypeRoot.INTEGER && b != DataTypeRoot.BIGINT) + || a == DataTypeRoot.BIGINT; + } + // Map> private final Map>> rules = new HashMap<>(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index d2ca5da42249..1498e08a2b20 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -109,7 +109,9 @@ private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry Predicate dataPredicate = dataFilterMapping.computeIfAbsent( entry.file().schemaId(), - id -> simpleStatsEvolutions.convertFilter(entry.file().schemaId(), filter)); + id -> + simpleStatsEvolutions.tryDevolveFilter( + entry.file().schemaId(), filter)); try (FileIndexPredicate predicate = new FileIndexPredicate(embeddedIndexBytes, dataRowType)) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index 8d8c51996cfe..e39ad2e3c2e7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -158,7 +158,7 @@ private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestE schemaId2DataFilter.computeIfAbsent( entry.file().schemaId(), id -> - fieldValueStatsConverters.convertFilter( + fieldValueStatsConverters.tryDevolveFilter( entry.file().schemaId(), valueFilter)); return predicate.evaluate(dataPredicate).remain(); } catch (IOException e) { diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java index 0ae2798c29e0..a827bf65d908 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java @@ -276,8 +276,10 @@ public static int[][] createDataProjection( } /** - * Create predicate list from data fields. We will visit all predicate in filters, reset it's - * field index, name and type, and ignore predicate if the field is not exist. + * When pushing down filters after schema evolution, we should devolve the literals from new + * types (in dataFields) to original types (in tableFields). We will visit all predicate in + * filters, reset its field index, name and type, and ignore predicate if the field is not + * exist. * * @param tableFields the table fields * @param dataFields the underlying data fields @@ -285,7 +287,7 @@ public static int[][] createDataProjection( * @return the data filters */ @Nullable - public static List createDataFilters( + public static List devolveDataFilters( List tableFields, List dataFields, List filters) { if (filters == null) { return null; @@ -308,29 +310,19 @@ public static List createDataFilters( return Optional.empty(); } - DataType dataValueType = dataField.type().copy(true); - DataType predicateType = predicate.type().copy(true); - CastExecutor castExecutor = - dataValueType.equals(predicateType) - ? null - : (CastExecutor) - CastExecutors.resolve( - predicate.type(), dataField.type()); - // Convert value from predicate type to underlying data type which may lose - // information, for example, convert double value to int. But it doesn't matter - // because it just for predicate push down and the data will be filtered - // correctly after reading. - List literals = - predicate.literals().stream() - .map(v -> castExecutor == null ? v : castExecutor.cast(v)) - .collect(Collectors.toList()); - return Optional.of( - new LeafPredicate( - predicate.function(), - dataField.type(), - indexOf(dataField, idToDataFields), - dataField.name(), - literals)); + List evolvedLiterals = + CastExecutors.safelyCastLiteralsWithNumericEvolution( + predicate, dataField.type()); + + return evolvedLiterals == null + ? Optional.empty() + : Optional.of( + new LeafPredicate( + predicate.function(), + dataField.type(), + indexOf(dataField, idToDataFields), + dataField.name(), + evolvedLiterals)); }; for (Predicate predicate : filters) { diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java index a0814b8c04c4..3277aa2bb102 100644 --- a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java +++ b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java @@ -77,15 +77,18 @@ public SimpleStatsEvolution getOrCreate(long dataSchemaId) { }); } - public Predicate convertFilter(long dataSchemaId, Predicate filter) { - return tableSchemaId == dataSchemaId - ? filter - : Objects.requireNonNull( - SchemaEvolutionUtil.createDataFilters( - schemaFields.apply(tableSchemaId), - schemaFields.apply(dataSchemaId), - Collections.singletonList(filter))) - .get(0); + @Nullable + public Predicate tryDevolveFilter(long dataSchemaId, Predicate filter) { + if (tableSchemaId == dataSchemaId) { + return filter; + } + List evolved = + Objects.requireNonNull( + SchemaEvolutionUtil.devolveDataFilters( + schemaFields.apply(tableSchemaId), + schemaFields.apply(dataSchemaId), + Collections.singletonList(filter))); + return evolved.isEmpty() ? null : evolved.get(0); } public List tableDataFields() { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java index f6c6287f51b4..00554b233c59 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java @@ -290,7 +290,7 @@ private List readFilters( List dataFilters = tableSchema.id() == dataSchema.id() ? filters - : SchemaEvolutionUtil.createDataFilters( + : SchemaEvolutionUtil.devolveDataFilters( tableSchema.fields(), dataSchema.fields(), filters); // Skip pushing down partition filters to reader. diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java index 9d947f76d995..2193d2358022 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java @@ -18,6 +18,7 @@ package org.apache.paimon.schema; +import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.predicate.Equal; @@ -37,6 +38,7 @@ import org.junit.jupiter.api.Test; +import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -263,7 +265,7 @@ public void testCreateDataProjection() { } @Test - public void testCreateDataFilters() { + public void testEvolveDataFilters() { List predicates = new ArrayList<>(); predicates.add( new LeafPredicate( @@ -278,7 +280,7 @@ public void testCreateDataFilters() { IsNull.INSTANCE, DataTypes.INT(), 7, "a", Collections.emptyList())); List filters = - SchemaEvolutionUtil.createDataFilters(tableFields2, dataFields, predicates); + SchemaEvolutionUtil.devolveDataFilters(tableFields2, dataFields, predicates); assert filters != null; assertThat(filters.size()).isEqualTo(1); @@ -290,24 +292,26 @@ public void testCreateDataFilters() { @Test public void testColumnTypeFilter() { - // (1, b, int) in data schema is updated to (1, c, double) in table2 + // alter d from INT to DECIMAL(10, 2) + // filter d = 11.01 will be devolved to d = 11 List predicates = new ArrayList<>(); predicates.add( new LeafPredicate( Equal.INSTANCE, - DataTypes.DOUBLE(), + DataTypes.DECIMAL(10, 2), 0, - "c", - Collections.singletonList(1.0D))); + "d", + Collections.singletonList( + Decimal.fromBigDecimal(new BigDecimal("11.01"), 10, 2)))); List filters = - SchemaEvolutionUtil.createDataFilters(tableFields2, dataFields, predicates); + SchemaEvolutionUtil.devolveDataFilters(tableFields2, dataFields, predicates); assert filters != null; assertThat(filters.size()).isEqualTo(1); LeafPredicate child = (LeafPredicate) filters.get(0); - // Validate value 1 with index 1 - assertThat(child.test(GenericRow.of(0, 1))).isTrue(); - // Validate value 2 with index 1 - assertThat(child.test(GenericRow.of(1, 2))).isFalse(); + // Validate value 11 with index 3 + assertThat(child.test(GenericRow.of(0, 0, 0, 11))).isTrue(); + // Validate value 12 with index 3 + assertThat(child.test(GenericRow.of(1, 0, 0, 12))).isFalse(); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java new file mode 100644 index 000000000000..2acc260abd7f --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink; + +import org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension; +import org.apache.paimon.testutils.junit.parameterized.Parameters; + +import org.apache.flink.types.Row; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** TODO all file format tests. */ +@ExtendWith(ParameterizedTestExtension.class) +public class FilterPushdownWithSchemaChangeITCase extends CatalogITCaseBase { + + private final String fileFormat; + + public FilterPushdownWithSchemaChangeITCase(String fileFormat) { + this.fileFormat = fileFormat; + } + + @SuppressWarnings("unused") + @Parameters(name = "file-format = {0}") + public static List fileFormat() { + return Arrays.asList("parquet", "orc", "avro"); + } + + @TestTemplate + public void testDecimalToDecimal() { + // to higher precision + sql( + "CREATE TABLE T (" + + " id INT," + + " f DECIMAL(5, 2)" + + ") with (" + + " 'file.format' = '%s'" + + ")", + fileFormat); + sql("INSERT INTO T VALUES (1, 111.32)"); + sql("ALTER TABLE T MODIFY (f DECIMAL(6, 3))"); + assertThat(sql("SELECT * FROM T WHERE f < 111.321")) + .containsExactly(Row.of(1, new BigDecimal("111.320"))); + assertThat(sql("SELECT * FROM T WHERE f = 111.321")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f = 111.320")) + .containsExactly(Row.of(1, new BigDecimal("111.320"))); + assertThat(sql("SELECT * FROM T WHERE f <> 111.321")) + .containsExactly(Row.of(1, new BigDecimal("111.320"))); + + sql("DROP TABLE T"); + + // to lower precision + sql( + "CREATE TABLE T (" + + " id INT," + + " f DECIMAL(6, 3)" + + ") with (" + + " 'file.format' = '%s'" + + ")", + fileFormat); + sql("INSERT INTO T VALUES (1, 111.321), (2, 111.331)"); + sql("ALTER TABLE T MODIFY (f DECIMAL(5, 2))"); + assertThat(sql("SELECT * FROM T WHERE f > 111.32")) + .containsExactly(Row.of(2, new BigDecimal("111.33"))); + assertThat(sql("SELECT * FROM T WHERE f = 111.32")) + .containsExactly(Row.of(1, new BigDecimal("111.32"))); + assertThat(sql("SELECT * FROM T WHERE f <> 111.32")) + .containsExactly(Row.of(2, new BigDecimal("111.33"))); + } + + @TestTemplate + public void testNumericPrimitiveToDecimal() { + String ddl = + "CREATE TABLE T (" + + " id INT," + + " f DECIMAL(5, 2)" + + ") with (" + + " 'file.format' = '%s'" + + ")"; + + // to higher precision + sql(ddl, fileFormat); + sql("INSERT INTO T VALUES (1, 111.32)"); + sql("ALTER TABLE T MODIFY (f DOUBLE)"); + assertThat(sql("SELECT * FROM T WHERE f < 111.321")).containsExactly(Row.of(1, 111.32)); + assertThat(sql("SELECT * FROM T WHERE f = 111.321")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f = 111.320")).containsExactly(Row.of(1, 111.32)); + assertThat(sql("SELECT * FROM T WHERE f <> 111.321")).containsExactly(Row.of(1, 111.32)); + + sql("DROP TABLE T"); + + // to lower precision + sql(ddl, fileFormat); + sql("INSERT INTO T VALUES (1, 111.32), (2, 112.33)"); + sql("ALTER TABLE T MODIFY (f INT)"); + assertThat(sql("SELECT * FROM T WHERE f < 112")).containsExactly(Row.of(1, 111)); + assertThat(sql("SELECT * FROM T WHERE f > 112")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f = 111")).containsExactly(Row.of(1, 111)); + assertThat(sql("SELECT * FROM T WHERE f <> 111")).containsExactly(Row.of(2, 112)); + } + + @TestTemplate + public void testDecimalToNumericPrimitive() { + // to higher precision + sql( + "CREATE TABLE T (" + + " id INT," + + " f INT" + + ") with (" + + " 'file.format' = '%s'" + + ")", + fileFormat); + sql("INSERT INTO T VALUES (1, 111)"); + sql("ALTER TABLE T MODIFY (f DECIMAL(5, 2))"); + assertThat(sql("SELECT * FROM T WHERE f < 111.01")) + .containsExactly(Row.of(1, new BigDecimal("111.00"))); + assertThat(sql("SELECT * FROM T WHERE f = 111.01")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f = 111.00")) + .containsExactly(Row.of(1, new BigDecimal("111.00"))); + assertThat(sql("SELECT * FROM T WHERE f <> 111.01")) + .containsExactly(Row.of(1, new BigDecimal("111.00"))); + + sql("DROP TABLE T"); + + // to lower precision + sql( + "CREATE TABLE T (" + + " id INT," + + " f DOUBLE" + + ") with (" + + " 'file.format' = '%s'" + + ")", + fileFormat); + sql("INSERT INTO T VALUES (1, 111.321), (2, 111.331)"); + sql("ALTER TABLE T MODIFY (f DECIMAL(5, 2))"); + assertThat(sql("SELECT * FROM T WHERE f > 111.32")) + .containsExactly(Row.of(2, new BigDecimal("111.33"))); + assertThat(sql("SELECT * FROM T WHERE f = 111.32")) + .containsExactly(Row.of(1, new BigDecimal("111.32"))); + assertThat(sql("SELECT * FROM T WHERE f <> 111.32")) + .containsExactly(Row.of(2, new BigDecimal("111.33"))); + } + + @TestTemplate + public void testNumericPrimitive() { + // no checks for high scale to low scale because we don't pushdown it + + // integer to higher scale integer + sql( + "CREATE TABLE T (" + + " id INT," + + " f TINYINT" + + ") with (" + + " 'file.format' = '%s'" + + ")", + fileFormat); + sql("INSERT INTO T VALUES (1, CAST (127 AS TINYINT))"); + sql("ALTER TABLE T MODIFY (f INT)"); + // (byte) 383 == 127 + assertThat(sql("SELECT * FROM T WHERE f < 128")).containsExactly(Row.of(1, 127)); + assertThat(sql("SELECT * FROM T WHERE f < 383")).containsExactly(Row.of(1, 127)); + assertThat(sql("SELECT * FROM T WHERE f = 127")).containsExactly(Row.of(1, 127)); + assertThat(sql("SELECT * FROM T WHERE f = 383")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f <> 127")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f <> 383")).containsExactly(Row.of(1, 127)); + } + + @TestTemplate + public void testNumericToString() { + // no more string related tests because we don't push down it + sql( + "CREATE TABLE T (" + + " id INT," + + " f STRING" + + ") with (" + + " 'file.format' = '%s'" + + ");", + fileFormat); + sql("INSERT INTO T VALUES (1, '1'), (2, '111')"); + sql("ALTER TABLE T MODIFY (f INT)"); + assertThat(sql("SELECT * FROM T WHERE f > 2")).containsExactly(Row.of(2, 111)); + assertThat(sql("SELECT * FROM T WHERE f = 1")).containsExactly(Row.of(1, 1)); + assertThat(sql("SELECT * FROM T WHERE f <> 1")).containsExactly(Row.of(2, 111)); + } +} From d71e9463602cbea37376cec94008b02817c8d3e7 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Mon, 16 Dec 2024 13:53:35 +0800 Subject: [PATCH 2/5] fix --- .../java/org/apache/paimon/schema/SchemaEvolutionUtil.java | 6 +++--- .../java/org/apache/paimon/stats/SimpleStatsEvolutions.java | 4 ++-- .../org/apache/paimon/schema/SchemaEvolutionUtilTest.java | 2 +- .../paimon/flink/FilterPushdownWithSchemaChangeITCase.java | 3 ++- 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java index a827bf65d908..83559c2b0009 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java @@ -310,11 +310,11 @@ public static List devolveDataFilters( return Optional.empty(); } - List evolvedLiterals = + List devolvedLiterals = CastExecutors.safelyCastLiteralsWithNumericEvolution( predicate, dataField.type()); - return evolvedLiterals == null + return devolvedLiterals == null ? Optional.empty() : Optional.of( new LeafPredicate( @@ -322,7 +322,7 @@ public static List devolveDataFilters( dataField.type(), indexOf(dataField, idToDataFields), dataField.name(), - evolvedLiterals)); + devolvedLiterals)); }; for (Predicate predicate : filters) { diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java index 3277aa2bb102..566cae9e6592 100644 --- a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java +++ b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java @@ -82,13 +82,13 @@ public Predicate tryDevolveFilter(long dataSchemaId, Predicate filter) { if (tableSchemaId == dataSchemaId) { return filter; } - List evolved = + List devolved = Objects.requireNonNull( SchemaEvolutionUtil.devolveDataFilters( schemaFields.apply(tableSchemaId), schemaFields.apply(dataSchemaId), Collections.singletonList(filter))); - return evolved.isEmpty() ? null : evolved.get(0); + return devolved.isEmpty() ? null : devolved.get(0); } public List tableDataFields() { diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java index 2193d2358022..c0ce5291326d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java @@ -265,7 +265,7 @@ public void testCreateDataProjection() { } @Test - public void testEvolveDataFilters() { + public void testDevolveDataFilters() { List predicates = new ArrayList<>(); predicates.add( new LeafPredicate( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java index 2acc260abd7f..7c518165192f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink; +import org.apache.paimon.casting.CastExecutors; import org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension; import org.apache.paimon.testutils.junit.parameterized.Parameters; @@ -31,7 +32,7 @@ import static org.assertj.core.api.Assertions.assertThat; -/** TODO all file format tests. */ +/** ITCase for {@link CastExecutors#safelyCastLiteralsWithNumericEvolution}. */ @ExtendWith(ParameterizedTestExtension.class) public class FilterPushdownWithSchemaChangeITCase extends CatalogITCaseBase { From 1c6a37a0ab17d70ce8a135a9497a8adb42de9e9d Mon Sep 17 00:00:00 2001 From: yuzelin Date: Tue, 17 Dec 2024 16:14:28 +0800 Subject: [PATCH 3/5] fix comments --- .../apache/paimon/casting/CastExecutors.java | 140 ++---------------- .../paimon/schema/SchemaEvolutionUtil.java | 23 ++- .../schema/SchemaEvolutionUtilTest.java | 28 ---- .../FilterPushdownWithSchemaChangeITCase.java | 3 +- 4 files changed, 22 insertions(+), 172 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java b/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java index e9daef26d276..2cb7319faabc 100644 --- a/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java +++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java @@ -18,32 +18,18 @@ package org.apache.paimon.casting; -import org.apache.paimon.data.Decimal; -import org.apache.paimon.predicate.Equal; -import org.apache.paimon.predicate.GreaterOrEqual; -import org.apache.paimon.predicate.GreaterThan; -import org.apache.paimon.predicate.In; -import org.apache.paimon.predicate.LeafFunction; -import org.apache.paimon.predicate.LeafPredicate; -import org.apache.paimon.predicate.LessOrEqual; -import org.apache.paimon.predicate.LessThan; -import org.apache.paimon.predicate.NotEqual; -import org.apache.paimon.predicate.NotIn; import org.apache.paimon.types.DataType; import org.apache.paimon.types.DataTypeFamily; import org.apache.paimon.types.DataTypeRoot; -import org.apache.paimon.types.DecimalType; import javax.annotation.Nullable; -import java.math.BigDecimal; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.stream.Collectors; import java.util.stream.Stream; /** Cast executors for input type and output type. */ @@ -116,138 +102,32 @@ public class CastExecutors { } /** - * When filter a field witch was evolved from/to a numeric type, we should carefully handle the - * precision match and overflow problem. + * If a field type is modified, pushing down a filter of it is dangerous. This method tries to + * cast the literals of filter to its original type. It only cast the literals when the CastRule + * is in whitelist. Otherwise, return Optional.empty(). */ - @Nullable - public static List safelyCastLiteralsWithNumericEvolution( - LeafPredicate predicate, DataType outputType) { - DataType inputType = predicate.type(); + public static Optional> castLiteralsWithEvolution( + List literals, DataType inputType, DataType outputType) { if (inputType.equalsIgnoreNullable(outputType)) { - return predicate.literals(); + return Optional.of(literals); } - List literals = predicate.literals(); - CastRule castRule = INSTANCE.internalResolve(inputType, outputType); if (castRule == null) { - return literals; + return Optional.empty(); } - if (castRule instanceof DecimalToDecimalCastRule) { - if (((DecimalType) inputType).getPrecision() < ((DecimalType) outputType).getPrecision() - && containsEqualCheck(predicate)) { - // For example, alter 111.321 from DECIMAL(6, 3) to DECIMAL(5, 2). - // The query result is 111.32 which is truncated from 111.321. - // If we query with filter f = 111.32 and push down it, 111.321 will be filtered - // out mistakenly. - // But if we query with filter f > 111.32, although 111.321 will be retrieved, - // the engine will filter out it finally. - return null; - } - // Pushing down higher precision filter is always correct. - return literals; - } else if (castRule instanceof NumericPrimitiveToDecimalCastRule) { - if (inputType.is(DataTypeFamily.INTEGER_NUMERIC) && containsEqualCheck(predicate)) { - // the reason is same as DecimalToDecimalCastRule - return null; - } - return literals.stream() - .map(literal -> (Number) literal) - .map( - literal -> - inputType.is(DataTypeFamily.INTEGER_NUMERIC) - ? BigDecimal.valueOf(literal.longValue()) - : BigDecimal.valueOf(literal.doubleValue())) - .map(bd -> Decimal.fromBigDecimal(bd, bd.precision(), bd.scale())) - .collect(Collectors.toList()); - } else if (castRule instanceof DecimalToNumericPrimitiveCastRule) { - if (outputType.is(DataTypeFamily.INTEGER_NUMERIC) - && (containsPartialCheck(predicate) || containsNotEqualCheck(predicate))) { - // For example, alter 111 from INT to DECIMAL(5, 2). The query result is 111.00 - // If we query with filter f < 111.01 and push down it as f < 111, 111 will be - // filtered out mistakenly. Also, we shouldn't push down f <> 111.01. - // But if we query with filter f = 111.01 and push down it as f = 111, although 111 - // will be retrieved, the engine will filter out it finally. - // TODO: maybe we can scale the partial filter. For example, f < 111.01 can be - // transfer to f < 112. - return null; - } else if (outputType.is(DataTypeFamily.APPROXIMATE_NUMERIC) - && containsEqualCheck(predicate)) { - // For example, alter 111.321 from DOUBLE to DECIMAL(5, 2). The query result is - // 111.32. - // If we query with filter f = 111.32 and push down it, 111.321 will be filtered - // out mistakenly. - // But if we query with filter f > 111.32 or f <> 111.32, although 111.321 will be - // retrieved, the engine will filter out it finally. - return null; - } - castLiterals(castRule, inputType, outputType, literals); - } else if (castRule instanceof NumericPrimitiveCastRule) { + if (castRule instanceof NumericPrimitiveCastRule) { if (inputType.is(DataTypeFamily.INTEGER_NUMERIC) && outputType.is(DataTypeFamily.INTEGER_NUMERIC)) { if (integerScaleLargerThan(inputType.getTypeRoot(), outputType.getTypeRoot())) { // Pushing down higher scale integer numeric filter is always correct. - return literals; + return Optional.of(literals); } } - - // Pushing down float filter is dangerous because the filter result is unpredictable. - // For example, (double) 0.1F in Java is 0.10000000149011612. - - // Pushing down lower scale filter is also dangerous because of overflow. - // For example, alter 383 from INT to TINYINT, the query result is (byte) 383 == 127. - // If we push down filter f = 127, 383 will be filtered out which is wrong. - - // So we don't push down these filters. - return null; - } else if (castRule instanceof NumericToStringCastRule - || castRule instanceof StringToDecimalCastRule - || castRule instanceof StringToNumericPrimitiveCastRule) { - // Pushing down filters related to STRING is dangerous because string comparison is - // different from number comparison and string literal to number might have precision - // and overflow problem. - // For example, alter '111' from STRING to INT, the query result is 111. - // If we query with filter f > 2 and push down it as f > '2', '111' will be filtered - // out mistakenly. - return null; } - // Non numeric related cast rule - return castLiterals(castRule, inputType, outputType, literals); - } - - private static List castLiterals( - CastRule castRule, - DataType inputType, - DataType outputType, - List literals) { - CastExecutor castExecutor = - (CastExecutor) castRule.create(inputType, outputType); - return literals.stream() - .map(l -> castExecutor == null ? l : castExecutor.cast(l)) - .collect(Collectors.toList()); - } - - private static boolean containsEqualCheck(LeafPredicate predicate) { - LeafFunction function = predicate.function(); - return function instanceof In - || function instanceof Equal - || function instanceof GreaterOrEqual - || function instanceof LessOrEqual; - } - - private static boolean containsPartialCheck(LeafPredicate predicate) { - LeafFunction function = predicate.function(); - return function instanceof LessThan - || function instanceof LessOrEqual - || function instanceof GreaterThan - || function instanceof GreaterOrEqual; - } - - private static boolean containsNotEqualCheck(LeafPredicate predicate) { - LeafFunction function = predicate.function(); - return function instanceof NotIn || function instanceof NotEqual; + return Optional.empty(); } private static boolean integerScaleLargerThan(DataTypeRoot a, DataTypeRoot b) { diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java index 83559c2b0009..cab5dcaeb8ad 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java @@ -310,19 +310,16 @@ public static List devolveDataFilters( return Optional.empty(); } - List devolvedLiterals = - CastExecutors.safelyCastLiteralsWithNumericEvolution( - predicate, dataField.type()); - - return devolvedLiterals == null - ? Optional.empty() - : Optional.of( - new LeafPredicate( - predicate.function(), - dataField.type(), - indexOf(dataField, idToDataFields), - dataField.name(), - devolvedLiterals)); + return CastExecutors.castLiteralsWithEvolution( + predicate.literals(), predicate.type(), dataField.type()) + .map( + literals -> + new LeafPredicate( + predicate.function(), + dataField.type(), + indexOf(dataField, idToDataFields), + dataField.name(), + literals)); }; for (Predicate predicate : filters) { diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java index c0ce5291326d..30d844e6c606 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java @@ -18,10 +18,8 @@ package org.apache.paimon.schema; -import org.apache.paimon.data.Decimal; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.predicate.Equal; import org.apache.paimon.predicate.IsNotNull; import org.apache.paimon.predicate.IsNull; import org.apache.paimon.predicate.LeafPredicate; @@ -38,7 +36,6 @@ import org.junit.jupiter.api.Test; -import java.math.BigDecimal; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -289,29 +286,4 @@ public void testDevolveDataFilters() { assertThat(child1.fieldName()).isEqualTo("b"); assertThat(child1.index()).isEqualTo(1); } - - @Test - public void testColumnTypeFilter() { - // alter d from INT to DECIMAL(10, 2) - // filter d = 11.01 will be devolved to d = 11 - List predicates = new ArrayList<>(); - predicates.add( - new LeafPredicate( - Equal.INSTANCE, - DataTypes.DECIMAL(10, 2), - 0, - "d", - Collections.singletonList( - Decimal.fromBigDecimal(new BigDecimal("11.01"), 10, 2)))); - List filters = - SchemaEvolutionUtil.devolveDataFilters(tableFields2, dataFields, predicates); - assert filters != null; - assertThat(filters.size()).isEqualTo(1); - - LeafPredicate child = (LeafPredicate) filters.get(0); - // Validate value 11 with index 3 - assertThat(child.test(GenericRow.of(0, 0, 0, 11))).isTrue(); - // Validate value 12 with index 3 - assertThat(child.test(GenericRow.of(1, 0, 0, 12))).isFalse(); - } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java index 7c518165192f..277c68746d58 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java @@ -21,6 +21,7 @@ import org.apache.paimon.casting.CastExecutors; import org.apache.paimon.testutils.junit.parameterized.ParameterizedTestExtension; import org.apache.paimon.testutils.junit.parameterized.Parameters; +import org.apache.paimon.types.DataType; import org.apache.flink.types.Row; import org.junit.jupiter.api.TestTemplate; @@ -32,7 +33,7 @@ import static org.assertj.core.api.Assertions.assertThat; -/** ITCase for {@link CastExecutors#safelyCastLiteralsWithNumericEvolution}. */ +/** ITCase for {@link CastExecutors#castLiteralsWithEvolution(List, DataType, DataType)}. */ @ExtendWith(ParameterizedTestExtension.class) public class FilterPushdownWithSchemaChangeITCase extends CatalogITCaseBase { From a2c10d75d8372500e4c9b162fd2e2a6fb611550f Mon Sep 17 00:00:00 2001 From: yuzelin Date: Wed, 18 Dec 2024 11:02:45 +0800 Subject: [PATCH 4/5] fix comments --- .../apache/paimon/casting/CastExecutors.java | 27 +++++++++++++++++-- .../FilterPushdownWithSchemaChangeITCase.java | 25 +++++++++++++++++ 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java b/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java index 2cb7319faabc..b0d9524adc17 100644 --- a/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java +++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java @@ -24,6 +24,7 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -118,11 +119,33 @@ public static Optional> castLiteralsWithEvolution( } if (castRule instanceof NumericPrimitiveCastRule) { + // Ignore float literals because pushing down float filter result is unpredictable. + // For example, (double) 0.1F in Java is 0.10000000149011612. + if (inputType.is(DataTypeFamily.INTEGER_NUMERIC) && outputType.is(DataTypeFamily.INTEGER_NUMERIC)) { + // Ignore input scale < output scale because of overflow. + // For example, alter 383 from INT to TINYINT, the query result is (byte) 383 == + // 127. If we push down filter f = 127, 383 will be filtered out mistakenly. + if (integerScaleLargerThan(inputType.getTypeRoot(), outputType.getTypeRoot())) { - // Pushing down higher scale integer numeric filter is always correct. - return Optional.of(literals); + if (inputType.getTypeRoot() != DataTypeRoot.BIGINT) { + return Optional.of(literals); + } + + // Parquet filter Int comparator cannot handle long value. + // See org.apache.parquet.schema.PrimitiveType. + // So ignore filter if long literal is out of int scale. + List newLiterals = new ArrayList<>(literals.size()); + for (Object literal : literals) { + long originalValue = (long) literal; + int newValue = (int) originalValue; + if (originalValue != newValue) { + return Optional.empty(); + } + newLiterals.add(newValue); + } + return Optional.of(newLiterals); } } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java index 277c68746d58..3b12ceabe2da 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java @@ -186,6 +186,31 @@ public void testNumericPrimitive() { assertThat(sql("SELECT * FROM T WHERE f = 383")).isEmpty(); assertThat(sql("SELECT * FROM T WHERE f <> 127")).isEmpty(); assertThat(sql("SELECT * FROM T WHERE f <> 383")).containsExactly(Row.of(1, 127)); + + sql("DROP TABLE T"); + + // INT to BIGINT + sql( + "CREATE TABLE T (" + + " id INT," + + " f INT" + + ") with (" + + " 'file.format' = '%s'" + + ")", + fileFormat); + // (int) Integer.MAX_VALUE + 1 == Integer.MIN_VALUE -> (int) 2147483648L == -2147483648 + sql("INSERT INTO T VALUES (1, 2147483647), (2, -2147483648)"); + sql("ALTER TABLE T MODIFY (f BIGINT)"); + assertThat(sql("SELECT * FROM T WHERE f < 2147483648")) + .containsExactlyInAnyOrder(Row.of(1, 2147483647L), Row.of(2, -2147483648L)); + assertThat(sql("SELECT * FROM T WHERE f > 2147483648")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f = 2147483647")) + .containsExactly(Row.of(1, 2147483647L)); + assertThat(sql("SELECT * FROM T WHERE f = 2147483648")).isEmpty(); + assertThat(sql("SELECT * FROM T WHERE f <> 2147483647")) + .containsExactly(Row.of(2, -2147483648L)); + assertThat(sql("SELECT * FROM T WHERE f <> 2147483648")) + .containsExactlyInAnyOrder(Row.of(1, 2147483647L), Row.of(2, -2147483648L)); } @TestTemplate From 99da05a94b80711941f0c558ba8d7b5db8ef90cd Mon Sep 17 00:00:00 2001 From: yuzelin Date: Wed, 18 Dec 2024 13:57:47 +0800 Subject: [PATCH 5/5] fix comments --- .../apache/paimon/casting/CastExecutors.java | 30 ++++++++----------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java b/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java index b0d9524adc17..546066d10aa3 100644 --- a/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java +++ b/paimon-common/src/main/java/org/apache/paimon/casting/CastExecutors.java @@ -108,12 +108,12 @@ public class CastExecutors { * is in whitelist. Otherwise, return Optional.empty(). */ public static Optional> castLiteralsWithEvolution( - List literals, DataType inputType, DataType outputType) { - if (inputType.equalsIgnoreNullable(outputType)) { + List literals, DataType predicateType, DataType dataType) { + if (predicateType.equalsIgnoreNullable(dataType)) { return Optional.of(literals); } - CastRule castRule = INSTANCE.internalResolve(inputType, outputType); + CastRule castRule = INSTANCE.internalResolve(predicateType, dataType); if (castRule == null) { return Optional.empty(); } @@ -122,28 +122,24 @@ public static Optional> castLiteralsWithEvolution( // Ignore float literals because pushing down float filter result is unpredictable. // For example, (double) 0.1F in Java is 0.10000000149011612. - if (inputType.is(DataTypeFamily.INTEGER_NUMERIC) - && outputType.is(DataTypeFamily.INTEGER_NUMERIC)) { + if (predicateType.is(DataTypeFamily.INTEGER_NUMERIC) + && dataType.is(DataTypeFamily.INTEGER_NUMERIC)) { // Ignore input scale < output scale because of overflow. // For example, alter 383 from INT to TINYINT, the query result is (byte) 383 == // 127. If we push down filter f = 127, 383 will be filtered out mistakenly. - if (integerScaleLargerThan(inputType.getTypeRoot(), outputType.getTypeRoot())) { - if (inputType.getTypeRoot() != DataTypeRoot.BIGINT) { - return Optional.of(literals); - } - - // Parquet filter Int comparator cannot handle long value. - // See org.apache.parquet.schema.PrimitiveType. - // So ignore filter if long literal is out of int scale. + if (integerScaleLargerThan(predicateType.getTypeRoot(), dataType.getTypeRoot())) { + CastExecutor castExecutor = + (CastExecutor) castRule.create(predicateType, dataType); List newLiterals = new ArrayList<>(literals.size()); for (Object literal : literals) { - long originalValue = (long) literal; - int newValue = (int) originalValue; - if (originalValue != newValue) { + Number literalNumber = (Number) literal; + Number newLiteralNumber = castExecutor.cast(literalNumber); + // Ignore if any literal is overflowed. + if (newLiteralNumber.longValue() != literalNumber.longValue()) { return Optional.empty(); } - newLiterals.add(newValue); + newLiterals.add(newLiteralNumber); } return Optional.of(newLiterals); }