diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index 9e4bf12f04fb..0cf3dbdb69ac 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -80,7 +80,9 @@ public AppendOnlyFileStoreScan withFilter(Predicate predicate) { /** Note: Keep this thread-safe. */ @Override protected boolean filterByStats(ManifestEntry entry) { - if (filter == null) { + Predicate safeFilter = + simpleStatsEvolutions.toEvolutionSafeStatsFilter(entry.file().schemaId(), filter); + if (safeFilter == null) { return true; } @@ -91,7 +93,7 @@ protected boolean filterByStats(ManifestEntry entry) { entry.file().rowCount(), entry.file().valueStatsCols()); - return filter.test( + return safeFilter.test( entry.file().rowCount(), stats.minValues(), stats.maxValues(), @@ -110,7 +112,7 @@ private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry dataFilterMapping.computeIfAbsent( entry.file().schemaId(), id -> - simpleStatsEvolutions.tryDevolveFilter( + simpleStatsEvolutions.toEvolutionSafeStatsFilter( entry.file().schemaId(), filter)); try (FileIndexPredicate predicate = diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index 9845b01346cf..a1b364679b2d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -85,10 +85,11 @@ public KeyValueFileStoreScan( manifestFileFactory, scanManifestParallelism); this.bucketSelectConverter = bucketSelectConverter; + // NOTE: don't add key prefix to field names because fieldKeyStatsConverters is used for + // filter conversion this.fieldKeyStatsConverters = new SimpleStatsEvolutions( - sid -> keyValueFieldsExtractor.keyFields(scanTableSchema(sid)), - schema.id()); + sid -> scanTableSchema(sid).trimmedPrimaryKeysFields(), schema.id()); this.fieldValueStatsConverters = new SimpleStatsEvolutions( sid -> keyValueFieldsExtractor.valueFields(scanTableSchema(sid)), @@ -119,21 +120,24 @@ public FileStoreScan enableValueFilter() { /** Note: Keep this thread-safe. */ @Override protected boolean filterByStats(ManifestEntry entry) { - DataFileMeta file = entry.file(); if (isValueFilterEnabled() && !filterByValueFilter(entry)) { return false; } - if (keyFilter != null) { - SimpleStatsEvolution.Result stats = - fieldKeyStatsConverters - .getOrCreate(file.schemaId()) - .evolution(file.keyStats(), file.rowCount(), null); - return keyFilter.test( - file.rowCount(), stats.minValues(), stats.maxValues(), stats.nullCounts()); + Predicate safeKeyFilter = + fieldKeyStatsConverters.toEvolutionSafeStatsFilter( + entry.file().schemaId(), keyFilter); + if (safeKeyFilter == null) { + return true; } - return true; + DataFileMeta file = entry.file(); + SimpleStatsEvolution.Result stats = + fieldKeyStatsConverters + .getOrCreate(file.schemaId()) + .evolution(file.keyStats(), file.rowCount(), null); + return safeKeyFilter.test( + file.rowCount(), stats.minValues(), stats.maxValues(), stats.nullCounts()); } @Override @@ -156,7 +160,7 @@ private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestE schemaId2DataFilter.computeIfAbsent( entry.file().schemaId(), id -> - fieldValueStatsConverters.tryDevolveFilter( + fieldValueStatsConverters.toEvolutionSafeStatsFilter( entry.file().schemaId(), valueFilter)); return predicate.evaluate(dataPredicate).remain(); } catch (IOException e) { @@ -225,12 +229,19 @@ private boolean filterByValueFilter(ManifestEntry entry) { return ((FilteredManifestEntry) entry).selected(); } + Predicate safeValueFilter = + fieldValueStatsConverters.toEvolutionSafeStatsFilter( + entry.file().schemaId(), valueFilter); + if (safeValueFilter == null) { + return true; + } + DataFileMeta file = entry.file(); SimpleStatsEvolution.Result result = fieldValueStatsConverters .getOrCreate(file.schemaId()) .evolution(file.valueStats(), file.rowCount(), file.valueStatsCols()); - return valueFilter.test( + return safeValueFilter.test( file.rowCount(), result.minValues(), result.maxValues(), diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java index d30fe19abbac..e7e4831386f0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java @@ -131,11 +131,16 @@ public CastFieldGetter[] getCastMapping() { * @param tableFields the table fields * @param dataFields the underlying data fields * @param filters the filters + * @param forData true if devolve the filters for filtering data file, otherwise, for filtering + * manifest entry * @return the data filters */ @Nullable - public static List devolveDataFilters( - List tableFields, List dataFields, List filters) { + public static List devolveFilters( + List tableFields, + List dataFields, + List filters, + boolean forData) { if (filters == null) { return null; } @@ -154,19 +159,36 @@ public static List devolveDataFilters( String.format("Find no field %s", predicate.fieldName())); DataField dataField = idToDataFields.get(tableField.id()); if (dataField == null) { + // For example, add field b and filter b, the filter is safe for old file + // meta without field b because the index mapping array can handle null + return forData ? Optional.empty() : Optional.of(predicate); + } + + Optional> castedLiterals = + CastExecutors.castLiteralsWithEvolution( + predicate.literals(), predicate.type(), dataField.type()); + + // unsafe + if (!castedLiterals.isPresent()) { return Optional.empty(); } - return CastExecutors.castLiteralsWithEvolution( - predicate.literals(), predicate.type(), dataField.type()) - .map( - literals -> - new LeafPredicate( - predicate.function(), - dataField.type(), - indexOf(dataField, idToDataFields), - dataField.name(), - literals)); + if (forData) { + // For data, the filter will be pushdown to data file, so must use the index + // and literal type of data file + return Optional.of( + new LeafPredicate( + predicate.function(), + dataField.type(), + indexOf(dataField, idToDataFields), + dataField.name(), + castedLiterals.get())); + } else { + // For meta, the index mapping array will map the index the cast the + // literals, so just return self + // In other words, return it if it's safe + return Optional.of(predicate); + } }; for (Predicate predicate : filters) { diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java index 566cae9e6592..eb44ea706c89 100644 --- a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java +++ b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java @@ -19,6 +19,7 @@ package org.apache.paimon.stats; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.IndexCastMapping; import org.apache.paimon.schema.SchemaEvolutionUtil; import org.apache.paimon.types.DataField; @@ -26,7 +27,6 @@ import javax.annotation.Nullable; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -77,18 +77,25 @@ public SimpleStatsEvolution getOrCreate(long dataSchemaId) { }); } + /** + * If the file's schema id != current table schema id, convert the filter to evolution safe + * filter or null if can't. + */ @Nullable - public Predicate tryDevolveFilter(long dataSchemaId, Predicate filter) { - if (tableSchemaId == dataSchemaId) { + public Predicate toEvolutionSafeStatsFilter(long dataSchemaId, @Nullable Predicate filter) { + if (filter == null || dataSchemaId == tableSchemaId) { return filter; } + + // Filter p1 && p2, if only p1 is safe, we can return only p1 to try best filter and let the + // compute engine to perform p2. + List filters = PredicateBuilder.splitAnd(filter); List devolved = Objects.requireNonNull( - SchemaEvolutionUtil.devolveDataFilters( - schemaFields.apply(tableSchemaId), - schemaFields.apply(dataSchemaId), - Collections.singletonList(filter))); - return devolved.isEmpty() ? null : devolved.get(0); + SchemaEvolutionUtil.devolveFilters( + tableDataFields, schemaFields.apply(dataSchemaId), filters, false)); + + return devolved.isEmpty() ? null : PredicateBuilder.and(devolved); } public List tableDataFields() { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java index 8de9bf8508fa..1c8997174223 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java @@ -308,8 +308,8 @@ private List readFilters( List dataFilters = tableSchema.id() == dataSchema.id() ? filters - : SchemaEvolutionUtil.devolveDataFilters( - tableSchema.fields(), dataSchema.fields(), filters); + : SchemaEvolutionUtil.devolveFilters( + tableSchema.fields(), dataSchema.fields(), filters, true); // Skip pushing down partition filters to reader. return excludePredicateWithFields( diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index efe4c1f62c92..7d303cad2dd7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -53,6 +53,7 @@ import org.apache.paimon.table.ExpireChangelogImpl; import org.apache.paimon.table.ExpireSnapshots; import org.apache.paimon.table.ExpireSnapshotsImpl; +import org.apache.paimon.table.SpecialFields; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ScanMode; @@ -126,7 +127,7 @@ private TestFileStore( valueType.getFields(), valueType.getFieldCount(), partitionType.getFieldNames(), - keyType.getFieldNames(), + cleanPrimaryKeys(keyType.getFieldNames()), Collections.emptyMap(), null), false, @@ -148,6 +149,12 @@ private TestFileStore( this.commitIdentifier = 0L; } + private static List cleanPrimaryKeys(List primaryKeys) { + return primaryKeys.stream() + .map(k -> k.substring(SpecialFields.KEY_FIELD_PREFIX.length())) + .collect(Collectors.toList()); + } + private static SchemaManager schemaManager(String root, CoreOptions options) { return new SchemaManager(FileIOFinder.find(new Path(root)), options.path()); } diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java index 291ad0ef4fc0..2883b36b5c5d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java @@ -91,8 +91,8 @@ public void testDevolveDataFilters() { IsNull.INSTANCE, DataTypes.INT(), 7, "a", Collections.emptyList())); List filters = - SchemaEvolutionUtil.devolveDataFilters(tableFields2, dataFields, predicates); - assert filters != null; + SchemaEvolutionUtil.devolveFilters(tableFields2, dataFields, predicates, true); + assertThat(filters).isNotNull(); assertThat(filters.size()).isEqualTo(1); LeafPredicate child1 = (LeafPredicate) filters.get(0); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileDataTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileDataTestBase.java index af927377eed1..5075343adfe6 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileDataTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileDataTestBase.java @@ -119,14 +119,8 @@ public void testTableSplitFilterNormalFields() throws Exception { (files, schemas) -> { FileStoreTable table = createFileStoreTable(schemas); - /** - * Filter field "g" in [200, 500] in SCHEMA_FIELDS which is updated from bigint - * to float and will get another file with one data as followed: - * - *
    - *
  • 2,"400","401",402D,403,toDecimal(404),405F,406D,toDecimal(407),408,409,toBytes("410") - *
- */ + // Filter field "g" in [200, 500] in SCHEMA_FIELDS cannot filter old file 1 and + // file 2 because filter is forbidden List splits = toSplits( table.newSnapshotReader() @@ -139,8 +133,12 @@ public void testTableSplitFilterNormalFields() throws Exception { List fieldGetterList = getFieldGetterList(table); assertThat(getResult(table.newRead(), splits, fieldGetterList)) .containsExactlyInAnyOrder( + // old file1 + "1|100|101|102.0|103|104.00|105.0|106.0|107.00|108|109|110", + // old file2 "2|200|201|202.0|203|204.00|205.0|206.0|207.00|208|209|210", "2|300|301|302.0|303|304.00|305.0|306.0|307.00|308|309|310", + // normal filtered data "2|400|401|402.0|403|404.00|405.0|406.0|407.00|408|409|410"); }, getPrimaryKeyNames(), diff --git a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java index 7f264fa817b2..e92ce9b49bc9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java @@ -20,23 +20,16 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryString; -import org.apache.paimon.data.InternalRow; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; -import org.apache.paimon.schema.TableSchema; import org.apache.paimon.stats.SimpleStats; -import org.apache.paimon.stats.SimpleStatsEvolution; -import org.apache.paimon.stats.SimpleStatsEvolutions; import org.apache.paimon.table.source.DataSplit; -import org.apache.paimon.types.DataField; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.List; -import java.util.Map; -import java.util.function.Function; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -137,16 +130,8 @@ public void testTableSplitFilterNormalFields() throws Exception { (files, schemas) -> { FileStoreTable table = createFileStoreTable(schemas); - /* - Filter field "g" in [200, 500] in SCHEMA_FIELDS which is updated from bigint - to float and will get another file with one data as followed: - -
    -
  • 2,"400","401",402D,403,toDecimal(404),405F,406D,toDecimal(407),408,409,toBytes("410") -
- -

Then we can check the results of the two result files. - */ + // Filter field "g" in [200, 500], get old file 1,2 (filter is forbidden) and + // new file 1 List splits = table.newSnapshotReader() .withFilter( @@ -154,7 +139,7 @@ public void testTableSplitFilterNormalFields() throws Exception { .between(6, 200F, 500F)) .read() .dataSplits(); - checkFilterRowCount(toDataFileMetas(splits), 3L); + checkFilterRowCount(toDataFileMetas(splits), 4L); List filesName = files.stream().map(DataFileMeta::fileName).collect(Collectors.toList()); @@ -169,9 +154,6 @@ public void testTableSplitFilterNormalFields() throws Exception { .map(DataFileMeta::fileName) .collect(Collectors.toList())) .containsAll(filesName); - - validateValuesWithNewSchema( - schemas, table.schema().id(), filesName, fileMetaList); }, getPrimaryKeyNames(), tableConfig, @@ -220,125 +202,12 @@ public void testTableSplitFilterPrimaryKeyFields() throws Exception { .map(DataFileMeta::fileName) .collect(Collectors.toList())) .containsAll(filesName); - - // Compare all columns with table column type - validateValuesWithNewSchema( - schemas, table.schema().id(), filesName, fileMetaList); }, getPrimaryKeyNames(), tableConfig, this::createFileStoreTable); } - protected void validateValuesWithNewSchema( - Map tableSchemas, - long schemaId, - List filesName, - List fileMetaList) { - Function> schemaFields = id -> tableSchemas.get(id).fields(); - SimpleStatsEvolutions converters = new SimpleStatsEvolutions(schemaFields, schemaId); - for (DataFileMeta fileMeta : fileMetaList) { - SimpleStats stats = getTableValueStats(fileMeta); - SimpleStatsEvolution.Result result = - converters.getOrCreate(fileMeta.schemaId()).evolution(stats, null, null); - InternalRow min = result.minValues(); - InternalRow max = result.maxValues(); - assertThat(stats.minValues().getFieldCount()).isEqualTo(12); - if (filesName.contains(fileMeta.fileName())) { - checkTwoValues(min, max); - } else { - checkOneValue(min, max); - } - } - } - - /** - * Check file data with one data. - * - *

    - *
  • data: - * 2,"400","401",402D,403,toDecimal(404),405F,406D,toDecimal(407),408,409,toBytes("410") - *
  • types: a->int, b->varchar[10], c->varchar[10], d->double, e->int, f->decimal,g->float, - * h->double, i->decimal, j->date, k->date, l->varbinary - *
- */ - private void checkOneValue(InternalRow min, InternalRow max) { - assertThat(min.getInt(0)).isEqualTo(max.getInt(0)).isEqualTo(2); - assertThat(min.getString(1)) - .isEqualTo(max.getString(1)) - .isEqualTo(BinaryString.fromString("400")); - assertThat(min.getString(2)) - .isEqualTo(max.getString(2)) - .isEqualTo(BinaryString.fromString("401")); - assertThat(min.getDouble(3)).isEqualTo(max.getDouble(3)).isEqualTo(402D); - assertThat(min.getInt(4)).isEqualTo(max.getInt(4)).isEqualTo(403); - assertThat(min.getDecimal(5, 10, 2).toBigDecimal().intValue()) - .isEqualTo(max.getDecimal(5, 10, 2).toBigDecimal().intValue()) - .isEqualTo(404); - assertThat(min.getFloat(6)).isEqualTo(max.getFloat(6)).isEqualTo(405F); - assertThat(min.getDouble(7)).isEqualTo(max.getDouble(7)).isEqualTo(406D); - assertThat(min.getDecimal(8, 10, 2).toBigDecimal().doubleValue()) - .isEqualTo(max.getDecimal(8, 10, 2).toBigDecimal().doubleValue()) - .isEqualTo(407D); - assertThat(min.getInt(9)).isEqualTo(max.getInt(9)).isEqualTo(408); - assertThat(min.getInt(10)).isEqualTo(max.getInt(10)).isEqualTo(409); - assertThat(min.isNullAt(11)).isEqualTo(max.isNullAt(11)).isTrue(); - } - - /** - * Check file with new types and data. - * - *
    - *
  • data1: 2,"200","201",toDecimal(202),(short)203,204,205L,206F,207D,208,toTimestamp(209 * - * millsPerDay),toBytes("210") - *
  • data2: 2,"300","301",toDecimal(302),(short)303,304,305L,306F,307D,308,toTimestamp(309 * - * millsPerDay),toBytes("310") - *
  • old types: a->int, b->char[10], c->varchar[10], d->decimal, e->smallint, f->int, - * g->bigint, h->float, i->double, j->date, k->timestamp, l->binary - *
  • new types: a->int, b->varchar[10], c->varchar[10], d->double, e->int, - * f->decimal,g->float, h->double, i->decimal, j->date, k->date, l->varbinary - *
- */ - private void checkTwoValues(InternalRow min, InternalRow max) { - assertThat(min.getInt(0)).isEqualTo(2); - assertThat(max.getInt(0)).isEqualTo(2); - - // parquet does not support padding - assertThat(min.getString(1).toString()).startsWith("200"); - assertThat(max.getString(1).toString()).startsWith("300"); - - assertThat(min.getString(2)).isEqualTo(BinaryString.fromString("201")); - assertThat(max.getString(2)).isEqualTo(BinaryString.fromString("301")); - - assertThat(min.getDouble(3)).isEqualTo(202D); - assertThat(max.getDouble(3)).isEqualTo(302D); - - assertThat(min.getInt(4)).isEqualTo(203); - assertThat(max.getInt(4)).isEqualTo(303); - - assertThat(min.getDecimal(5, 10, 2).toBigDecimal().intValue()).isEqualTo(204); - assertThat(max.getDecimal(5, 10, 2).toBigDecimal().intValue()).isEqualTo(304); - - assertThat(min.getFloat(6)).isEqualTo(205F); - assertThat(max.getFloat(6)).isEqualTo(305F); - - assertThat(min.getDouble(7)).isEqualTo(206D); - assertThat(max.getDouble(7)).isEqualTo(306D); - - assertThat(min.getDecimal(8, 10, 2).toBigDecimal().doubleValue()).isEqualTo(207D); - assertThat(max.getDecimal(8, 10, 2).toBigDecimal().doubleValue()).isEqualTo(307D); - - assertThat(min.getInt(9)).isEqualTo(208); - assertThat(max.getInt(9)).isEqualTo(308); - - assertThat(min.getInt(10)).isEqualTo(209); - assertThat(max.getInt(10)).isEqualTo(309); - - // Min and max value of binary type is null - assertThat(min.isNullAt(11)).isTrue(); - assertThat(max.isNullAt(11)).isTrue(); - } - @Override protected List getPrimaryKeyNames() { return SCHEMA_PRIMARY_KEYS; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java index 64bb5f21abbb..066bd6c57d3f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java @@ -65,6 +65,8 @@ public void testTableSplitFilterNormalFields() throws Exception { (files, schemas) -> { FileStoreTable table = createFileStoreTable(schemas); + // filter g: old file cannot apply filter, so the new file in the same partition + // also cannot be filtered, the result contains all data List splits = toSplits( table.newSnapshotReader() @@ -77,6 +79,9 @@ public void testTableSplitFilterNormalFields() throws Exception { List fieldGetterList = getFieldGetterList(table); assertThat(getResult(table.newRead(), splits, fieldGetterList)) .containsExactlyInAnyOrder( + "1|100|101|102.0|103|104.00|105.0|106.0|107.00|108|109|110", + "1|500|501|502.0|503|504.00|505.0|506.0|507.00|508|509|510", + "1|600|601|602.0|603|604.00|605.0|606.0|607.00|608|609|610", "2|200|201|202.0|203|204.00|205.0|206.0|207.00|208|209|210", "2|300|301|302.0|303|304.00|305.0|306.0|307.00|308|309|310", "2|400|401|402.0|403|404.00|405.0|406.0|407.00|408|409|410"); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java index 32a4138be564..359d5e7b9dda 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java @@ -18,25 +18,19 @@ package org.apache.paimon.table; -import org.apache.paimon.data.BinaryString; -import org.apache.paimon.data.InternalRow; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.stats.SimpleStats; -import org.apache.paimon.stats.SimpleStatsEvolution; -import org.apache.paimon.stats.SimpleStatsEvolutions; import org.apache.paimon.table.source.DataSplit; -import org.apache.paimon.types.DataField; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.List; import java.util.Map; -import java.util.function.Function; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; @@ -105,54 +99,12 @@ public void testTableSplitFilterNormalFields() throws Exception { .between(6, 200F, 500F)) .read() .dataSplits(); - // filtered and only 3 rows left - checkFilterRowCount(toDataFileMetas(splits), 3L); + // filter g: old file cannot apply filter, so the new file in the same partition + // also cannot be filtered, the result contains all data + checkFilterRowCount(toDataFileMetas(splits), 6L); }, getPrimaryKeyNames(), tableConfig, this::createFileStoreTable); } - - /** We can only validate the values in primary keys for changelog with key table. */ - @Override - protected void validateValuesWithNewSchema( - Map tableSchemas, - long schemaId, - List filesName, - List fileMetaList) { - Function> schemaFields = - id -> tableSchemas.get(id).logicalTrimmedPrimaryKeysType().getFields(); - SimpleStatsEvolutions converters = new SimpleStatsEvolutions(schemaFields, schemaId); - for (DataFileMeta fileMeta : fileMetaList) { - SimpleStats stats = getTableValueStats(fileMeta); - SimpleStatsEvolution.Result result = - converters.getOrCreate(fileMeta.schemaId()).evolution(stats, null, null); - InternalRow min = result.minValues(); - InternalRow max = result.maxValues(); - assertThat(min.getFieldCount()).isEqualTo(4); - if (filesName.contains(fileMeta.fileName())) { - // parquet does not support padding - assertThat(min.getString(0).toString()).startsWith("200"); - assertThat(max.getString(0).toString()).startsWith("300"); - - assertThat(min.getString(1)).isEqualTo(BinaryString.fromString("201")); - assertThat(max.getString(1)).isEqualTo(BinaryString.fromString("301")); - - assertThat(min.getDouble(2)).isEqualTo(202D); - assertThat(max.getDouble(2)).isEqualTo(302D); - - assertThat(min.getInt(3)).isEqualTo(203); - assertThat(max.getInt(3)).isEqualTo(303); - } else { - assertThat(min.getString(0)) - .isEqualTo(max.getString(0)) - .isEqualTo(BinaryString.fromString("400")); - assertThat(min.getString(1)) - .isEqualTo(max.getString(1)) - .isEqualTo(BinaryString.fromString("401")); - assertThat(min.getDouble(2)).isEqualTo(max.getDouble(2)).isEqualTo(402D); - assertThat(min.getInt(3)).isEqualTo(max.getInt(3)).isEqualTo(403); - } - } - } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java index 5d20e46ec274..24955dc94a5a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.DataFormatTestUtil; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; @@ -34,6 +35,7 @@ import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.sink.StreamTableWrite; import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.snapshot.SnapshotReader; @@ -405,6 +407,59 @@ public void testCreateAlterSystemField() throws Exception { "_VALUE_KIND", SYSTEM_FIELD_NAMES))); } + @Test + public void testPushDownEvolutionSafeFilter() throws Exception { + Schema schema = + new Schema( + RowType.of(DataTypes.INT(), DataTypes.BIGINT()).getFields(), + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonMap("write-only", "true"), + ""); + schemaManager.createTable(schema); + + FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath); + + try (StreamTableWrite write = table.newWrite(commitUser); + TableCommitImpl commit = table.newCommit(commitUser)) { + // file 1 + write.write(GenericRow.of(1, 1L)); + commit.commit(1, write.prepareCommit(false, 1)); + + // file 2 + write.write(GenericRow.of(2, 2L)); + commit.commit(2, write.prepareCommit(false, 2)); + } + + schemaManager.commitChanges( + Collections.singletonList(SchemaChange.updateColumnType("f0", DataTypes.STRING()))); + table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath); + + try (StreamTableWrite write = table.newWrite(commitUser); + TableCommitImpl commit = table.newCommit(commitUser)) { + // file 3 + write.write(GenericRow.of(BinaryString.fromString("0"), 3L)); + commit.commit(3, write.prepareCommit(false, 3)); + + // file 4 + write.write(GenericRow.of(BinaryString.fromString("3"), 3L)); + commit.commit(4, write.prepareCommit(false, 4)); + } + + PredicateBuilder builder = new PredicateBuilder(table.schema().logicalRowType()); + // p: f0 >= '3' && f1 >= 2L + Predicate p = + PredicateBuilder.and( + builder.greaterOrEqual(0, BinaryString.fromString("3")), + builder.greaterOrEqual(1, 2L)); + // file 1 will be filtered by f1 >= 2L + // file 2 won't be filtered because f0 >= '3' is not safe + // file 3 will be filtered by f0 >= '3' + // file 4 won't be filtered + List rows = readRecords(table, p); + assertThat(rows).containsExactlyInAnyOrder("2, 2", "3, 3"); + } + private List readRecords(FileStoreTable table, Predicate filter) throws IOException { List results = new ArrayList<>(); forEachRemaining( @@ -423,7 +478,9 @@ private void forEachRemaining( if (filter != null) { snapshotReader.withFilter(filter); } - for (Split split : snapshotReader.read().dataSplits()) { + List dataSplits = snapshotReader.read().dataSplits(); + + for (Split split : dataSplits) { InnerTableRead read = table.newRead(); if (filter != null) { read.withFilter(filter); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java index 3b12ceabe2da..73ea8f0ae119 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java @@ -230,4 +230,30 @@ public void testNumericToString() { assertThat(sql("SELECT * FROM T WHERE f = 1")).containsExactly(Row.of(1, 1)); assertThat(sql("SELECT * FROM T WHERE f <> 1")).containsExactly(Row.of(2, 111)); } + + @TestTemplate + public void testAppendStringToNumericForStatsFilter() { + sql("CREATE TABLE T (a STRING)"); + sql("INSERT INTO T VALUES ('9'), ('10'), ('11')"); + sql("ALTER TABLE T MODIFY (a INT)"); + + assertThat(sql("SELECT * FROM T WHERE a > 9")) + .containsExactlyInAnyOrder(Row.of(10), Row.of(11)); + } + + @TestTemplate + public void testPrimaryStringToNumericForStatsFilter() { + sql("CREATE TABLE T (pk STRING PRIMARY KEY NOT ENFORCED, v STRING)"); + sql("INSERT INTO T VALUES ('9', '9'), ('10', '10'), ('11', '11')"); + + // key filter + sql("ALTER TABLE T MODIFY (pk INT)"); + assertThat(sql("SELECT * FROM T WHERE pk > 9")) + .containsExactlyInAnyOrder(Row.of(10, "10"), Row.of(11, "11")); + + // value filter + sql("ALTER TABLE T MODIFY (v INT)"); + assertThat(sql("SELECT * FROM T WHERE v > 9")) + .containsExactlyInAnyOrder(Row.of(10, 10), Row.of(11, 11)); + } }