From d399ed9378f60b61703fb3901a537585889a0bda Mon Sep 17 00:00:00 2001 From: yuzelin Date: Wed, 9 Jul 2025 17:12:34 +0800 Subject: [PATCH 1/6] [core] Fix stats filter by pushdown after schema evolution --- .../apache/paimon/io/RecordLevelExpire.java | 2 +- .../operation/AppendOnlyFileStoreScan.java | 6 +- .../operation/KeyValueFileStoreScan.java | 27 ++-- .../paimon/schema/SchemaEvolutionUtil.java | 17 ++- .../paimon/stats/SimpleStatsEvolutions.java | 22 ++- .../paimon/table/PrimaryKeyTableUtils.java | 6 +- .../paimon/utils/FormatReaderMapping.java | 6 +- .../schema/SchemaEvolutionUtilTest.java | 3 +- .../table/ColumnTypeFileDataTestBase.java | 14 +- .../table/ColumnTypeFileMetaTestBase.java | 130 +----------------- .../PrimaryKeyColumnTypeFileDataTest.java | 5 + ...PrimaryKeyTableColumnTypeFileMetaTest.java | 48 +------ .../FilterPushdownWithSchemaChangeITCase.java | 26 ++++ 13 files changed, 108 insertions(+), 204 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java index 9e05ed1674d8..6bd4df6ce3fe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java @@ -112,7 +112,7 @@ private RecordLevelExpire( fieldValueStatsConverters = new SimpleStatsEvolutions( - sid -> extractor.valueFields(scanTableSchema(sid)), schema.id()); + sid -> extractor.valueFields(scanTableSchema(sid)), schema.id(), false); } public boolean isExpireFile(DataFileMeta file) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index 9e4bf12f04fb..7282ef745b2f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -67,7 +67,7 @@ public AppendOnlyFileStoreScan( scanManifestParallelism); this.bucketSelectConverter = bucketSelectConverter; this.simpleStatsEvolutions = - new SimpleStatsEvolutions(sid -> scanTableSchema(sid).fields(), schema.id()); + new SimpleStatsEvolutions(sid -> scanTableSchema(sid).fields(), schema.id(), false); this.fileIndexReadEnabled = fileIndexReadEnabled; } @@ -80,7 +80,7 @@ public AppendOnlyFileStoreScan withFilter(Predicate predicate) { /** Note: Keep this thread-safe. */ @Override protected boolean filterByStats(ManifestEntry entry) { - if (filter == null) { + if (filter == null || simpleStatsEvolutions.statsFilterUnsafe(entry, filter)) { return true; } @@ -111,7 +111,7 @@ private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry entry.file().schemaId(), id -> simpleStatsEvolutions.tryDevolveFilter( - entry.file().schemaId(), filter)); + entry.file().schemaId(), filter, false)); try (FileIndexPredicate predicate = new FileIndexPredicate(embeddedIndexBytes, dataRowType)) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index 9845b01346cf..ebaa39bd98df 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -88,7 +88,8 @@ public KeyValueFileStoreScan( this.fieldKeyStatsConverters = new SimpleStatsEvolutions( sid -> keyValueFieldsExtractor.keyFields(scanTableSchema(sid)), - schema.id()); + schema.id(), + true); this.fieldValueStatsConverters = new SimpleStatsEvolutions( sid -> keyValueFieldsExtractor.valueFields(scanTableSchema(sid)), @@ -119,21 +120,21 @@ public FileStoreScan enableValueFilter() { /** Note: Keep this thread-safe. */ @Override protected boolean filterByStats(ManifestEntry entry) { - DataFileMeta file = entry.file(); if (isValueFilterEnabled() && !filterByValueFilter(entry)) { return false; } - if (keyFilter != null) { - SimpleStatsEvolution.Result stats = - fieldKeyStatsConverters - .getOrCreate(file.schemaId()) - .evolution(file.keyStats(), file.rowCount(), null); - return keyFilter.test( - file.rowCount(), stats.minValues(), stats.maxValues(), stats.nullCounts()); + if (keyFilter == null || fieldKeyStatsConverters.statsFilterUnsafe(entry, keyFilter)) { + return true; } - return true; + DataFileMeta file = entry.file(); + SimpleStatsEvolution.Result stats = + fieldKeyStatsConverters + .getOrCreate(file.schemaId()) + .evolution(file.keyStats(), file.rowCount(), null); + return keyFilter.test( + file.rowCount(), stats.minValues(), stats.maxValues(), stats.nullCounts()); } @Override @@ -157,7 +158,7 @@ private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestE entry.file().schemaId(), id -> fieldValueStatsConverters.tryDevolveFilter( - entry.file().schemaId(), valueFilter)); + entry.file().schemaId(), valueFilter, false)); return predicate.evaluate(dataPredicate).remain(); } catch (IOException e) { throw new RuntimeException("Exception happens while checking fileIndex predicate.", e); @@ -225,6 +226,10 @@ private boolean filterByValueFilter(ManifestEntry entry) { return ((FilteredManifestEntry) entry).selected(); } + if (fieldValueStatsConverters.statsFilterUnsafe(entry, valueFilter)) { + return true; + } + DataFileMeta file = entry.file(); SimpleStatsEvolution.Result result = fieldValueStatsConverters diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java index d30fe19abbac..ef762a34839f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java @@ -31,6 +31,7 @@ import org.apache.paimon.predicate.LeafPredicate; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateReplaceVisitor; +import org.apache.paimon.table.PrimaryKeyTableUtils; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; @@ -131,11 +132,17 @@ public CastFieldGetter[] getCastMapping() { * @param tableFields the table fields * @param dataFields the underlying data fields * @param filters the filters + * @param isKeyFilter whether the filter is for system _KEY_ fields + * @param nullSafe whether to return predicate if the predicate field name isn't in dataFields * @return the data filters */ @Nullable public static List devolveDataFilters( - List tableFields, List dataFields, List filters) { + List tableFields, + List dataFields, + List filters, + boolean isKeyFilter, + boolean nullSafe) { if (filters == null) { return null; } @@ -148,13 +155,17 @@ public static List devolveDataFilters( PredicateReplaceVisitor visitor = predicate -> { + String fieldName = predicate.fieldName(); + if (isKeyFilter) { + fieldName = PrimaryKeyTableUtils.addKeyNamePrefix(fieldName); + } DataField tableField = checkNotNull( - nameToTableFields.get(predicate.fieldName()), + nameToTableFields.get(fieldName), String.format("Find no field %s", predicate.fieldName())); DataField dataField = idToDataFields.get(tableField.id()); if (dataField == null) { - return Optional.empty(); + return nullSafe ? Optional.of(predicate) : Optional.empty(); } return CastExecutors.castLiteralsWithEvolution( diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java index 566cae9e6592..1450f38cf909 100644 --- a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java +++ b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java @@ -18,6 +18,7 @@ package org.apache.paimon.stats; +import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.IndexCastMapping; import org.apache.paimon.schema.SchemaEvolutionUtil; @@ -44,13 +45,20 @@ public class SimpleStatsEvolutions { private final List tableDataFields; private final AtomicReference> tableFields; private final ConcurrentMap evolutions; + private final boolean isKeyStats; public SimpleStatsEvolutions(Function> schemaFields, long tableSchemaId) { + this(schemaFields, tableSchemaId, false); + } + + public SimpleStatsEvolutions( + Function> schemaFields, long tableSchemaId, boolean isKeyStats) { this.schemaFields = schemaFields; this.tableSchemaId = tableSchemaId; this.tableDataFields = schemaFields.apply(tableSchemaId); this.tableFields = new AtomicReference<>(); this.evolutions = new ConcurrentHashMap<>(); + this.isKeyStats = isKeyStats; } public SimpleStatsEvolution getOrCreate(long dataSchemaId) { @@ -78,19 +86,27 @@ public SimpleStatsEvolution getOrCreate(long dataSchemaId) { } @Nullable - public Predicate tryDevolveFilter(long dataSchemaId, Predicate filter) { + public Predicate tryDevolveFilter(long dataSchemaId, Predicate filter, boolean nullSafe) { if (tableSchemaId == dataSchemaId) { return filter; } List devolved = Objects.requireNonNull( SchemaEvolutionUtil.devolveDataFilters( - schemaFields.apply(tableSchemaId), + tableDataFields, schemaFields.apply(dataSchemaId), - Collections.singletonList(filter))); + Collections.singletonList(filter), + isKeyStats, + nullSafe)); return devolved.isEmpty() ? null : devolved.get(0); } + // Stats filter is unsafe if it should be devolved and the devolving is unsafe + // null safe: it's safe if the predicate field is added later and filter it on the old schema + public boolean statsFilterUnsafe(ManifestEntry entry, Predicate statsFilter) { + return tryDevolveFilter(entry.file().schemaId(), statsFilter, true) == null; + } + public List tableDataFields() { return tableDataFields; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java index a47e44718ebe..e4c502d94269 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java @@ -46,10 +46,14 @@ public static RowType addKeyNamePrefix(RowType type) { public static List addKeyNamePrefix(List keyFields) { return keyFields.stream() - .map(f -> f.newName(KEY_FIELD_PREFIX + f.name()).newId(f.id() + KEY_FIELD_ID_START)) + .map(f -> f.newName(addKeyNamePrefix(f.name())).newId(f.id() + KEY_FIELD_ID_START)) .collect(Collectors.toList()); } + public static String addKeyNamePrefix(String keyName) { + return KEY_FIELD_PREFIX + keyName; + } + public static MergeFunctionFactory createMergeFunctionFactory( TableSchema tableSchema, KeyValueFieldsExtractor extractor) { RowType rowType = tableSchema.logicalRowType(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java index 8de9bf8508fa..c7529b9a8a7b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java @@ -309,7 +309,11 @@ private List readFilters( tableSchema.id() == dataSchema.id() ? filters : SchemaEvolutionUtil.devolveDataFilters( - tableSchema.fields(), dataSchema.fields(), filters); + tableSchema.fields(), + dataSchema.fields(), + filters, + false, + false); // Skip pushing down partition filters to reader. return excludePredicateWithFields( diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java index 291ad0ef4fc0..033db6948159 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java @@ -91,7 +91,8 @@ public void testDevolveDataFilters() { IsNull.INSTANCE, DataTypes.INT(), 7, "a", Collections.emptyList())); List filters = - SchemaEvolutionUtil.devolveDataFilters(tableFields2, dataFields, predicates); + SchemaEvolutionUtil.devolveDataFilters( + tableFields2, dataFields, predicates, false, false); assert filters != null; assertThat(filters.size()).isEqualTo(1); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileDataTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileDataTestBase.java index af927377eed1..5075343adfe6 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileDataTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileDataTestBase.java @@ -119,14 +119,8 @@ public void testTableSplitFilterNormalFields() throws Exception { (files, schemas) -> { FileStoreTable table = createFileStoreTable(schemas); - /** - * Filter field "g" in [200, 500] in SCHEMA_FIELDS which is updated from bigint - * to float and will get another file with one data as followed: - * - *
    - *
  • 2,"400","401",402D,403,toDecimal(404),405F,406D,toDecimal(407),408,409,toBytes("410") - *
- */ + // Filter field "g" in [200, 500] in SCHEMA_FIELDS cannot filter old file 1 and + // file 2 because filter is forbidden List splits = toSplits( table.newSnapshotReader() @@ -139,8 +133,12 @@ public void testTableSplitFilterNormalFields() throws Exception { List fieldGetterList = getFieldGetterList(table); assertThat(getResult(table.newRead(), splits, fieldGetterList)) .containsExactlyInAnyOrder( + // old file1 + "1|100|101|102.0|103|104.00|105.0|106.0|107.00|108|109|110", + // old file2 "2|200|201|202.0|203|204.00|205.0|206.0|207.00|208|209|210", "2|300|301|302.0|303|304.00|305.0|306.0|307.00|308|309|310", + // normal filtered data "2|400|401|402.0|403|404.00|405.0|406.0|407.00|408|409|410"); }, getPrimaryKeyNames(), diff --git a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java index 7f264fa817b2..746618ee0657 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java @@ -137,16 +137,8 @@ public void testTableSplitFilterNormalFields() throws Exception { (files, schemas) -> { FileStoreTable table = createFileStoreTable(schemas); - /* - Filter field "g" in [200, 500] in SCHEMA_FIELDS which is updated from bigint - to float and will get another file with one data as followed: - -
    -
  • 2,"400","401",402D,403,toDecimal(404),405F,406D,toDecimal(407),408,409,toBytes("410") -
- -

Then we can check the results of the two result files. - */ + // Filter field "g" in [200, 500], get old file 1,2 (filter is forbidden) and + // new file 1 List splits = table.newSnapshotReader() .withFilter( @@ -154,7 +146,7 @@ public void testTableSplitFilterNormalFields() throws Exception { .between(6, 200F, 500F)) .read() .dataSplits(); - checkFilterRowCount(toDataFileMetas(splits), 3L); + checkFilterRowCount(toDataFileMetas(splits), 4L); List filesName = files.stream().map(DataFileMeta::fileName).collect(Collectors.toList()); @@ -169,9 +161,6 @@ public void testTableSplitFilterNormalFields() throws Exception { .map(DataFileMeta::fileName) .collect(Collectors.toList())) .containsAll(filesName); - - validateValuesWithNewSchema( - schemas, table.schema().id(), filesName, fileMetaList); }, getPrimaryKeyNames(), tableConfig, @@ -220,125 +209,12 @@ public void testTableSplitFilterPrimaryKeyFields() throws Exception { .map(DataFileMeta::fileName) .collect(Collectors.toList())) .containsAll(filesName); - - // Compare all columns with table column type - validateValuesWithNewSchema( - schemas, table.schema().id(), filesName, fileMetaList); }, getPrimaryKeyNames(), tableConfig, this::createFileStoreTable); } - protected void validateValuesWithNewSchema( - Map tableSchemas, - long schemaId, - List filesName, - List fileMetaList) { - Function> schemaFields = id -> tableSchemas.get(id).fields(); - SimpleStatsEvolutions converters = new SimpleStatsEvolutions(schemaFields, schemaId); - for (DataFileMeta fileMeta : fileMetaList) { - SimpleStats stats = getTableValueStats(fileMeta); - SimpleStatsEvolution.Result result = - converters.getOrCreate(fileMeta.schemaId()).evolution(stats, null, null); - InternalRow min = result.minValues(); - InternalRow max = result.maxValues(); - assertThat(stats.minValues().getFieldCount()).isEqualTo(12); - if (filesName.contains(fileMeta.fileName())) { - checkTwoValues(min, max); - } else { - checkOneValue(min, max); - } - } - } - - /** - * Check file data with one data. - * - *

    - *
  • data: - * 2,"400","401",402D,403,toDecimal(404),405F,406D,toDecimal(407),408,409,toBytes("410") - *
  • types: a->int, b->varchar[10], c->varchar[10], d->double, e->int, f->decimal,g->float, - * h->double, i->decimal, j->date, k->date, l->varbinary - *
- */ - private void checkOneValue(InternalRow min, InternalRow max) { - assertThat(min.getInt(0)).isEqualTo(max.getInt(0)).isEqualTo(2); - assertThat(min.getString(1)) - .isEqualTo(max.getString(1)) - .isEqualTo(BinaryString.fromString("400")); - assertThat(min.getString(2)) - .isEqualTo(max.getString(2)) - .isEqualTo(BinaryString.fromString("401")); - assertThat(min.getDouble(3)).isEqualTo(max.getDouble(3)).isEqualTo(402D); - assertThat(min.getInt(4)).isEqualTo(max.getInt(4)).isEqualTo(403); - assertThat(min.getDecimal(5, 10, 2).toBigDecimal().intValue()) - .isEqualTo(max.getDecimal(5, 10, 2).toBigDecimal().intValue()) - .isEqualTo(404); - assertThat(min.getFloat(6)).isEqualTo(max.getFloat(6)).isEqualTo(405F); - assertThat(min.getDouble(7)).isEqualTo(max.getDouble(7)).isEqualTo(406D); - assertThat(min.getDecimal(8, 10, 2).toBigDecimal().doubleValue()) - .isEqualTo(max.getDecimal(8, 10, 2).toBigDecimal().doubleValue()) - .isEqualTo(407D); - assertThat(min.getInt(9)).isEqualTo(max.getInt(9)).isEqualTo(408); - assertThat(min.getInt(10)).isEqualTo(max.getInt(10)).isEqualTo(409); - assertThat(min.isNullAt(11)).isEqualTo(max.isNullAt(11)).isTrue(); - } - - /** - * Check file with new types and data. - * - *
    - *
  • data1: 2,"200","201",toDecimal(202),(short)203,204,205L,206F,207D,208,toTimestamp(209 * - * millsPerDay),toBytes("210") - *
  • data2: 2,"300","301",toDecimal(302),(short)303,304,305L,306F,307D,308,toTimestamp(309 * - * millsPerDay),toBytes("310") - *
  • old types: a->int, b->char[10], c->varchar[10], d->decimal, e->smallint, f->int, - * g->bigint, h->float, i->double, j->date, k->timestamp, l->binary - *
  • new types: a->int, b->varchar[10], c->varchar[10], d->double, e->int, - * f->decimal,g->float, h->double, i->decimal, j->date, k->date, l->varbinary - *
- */ - private void checkTwoValues(InternalRow min, InternalRow max) { - assertThat(min.getInt(0)).isEqualTo(2); - assertThat(max.getInt(0)).isEqualTo(2); - - // parquet does not support padding - assertThat(min.getString(1).toString()).startsWith("200"); - assertThat(max.getString(1).toString()).startsWith("300"); - - assertThat(min.getString(2)).isEqualTo(BinaryString.fromString("201")); - assertThat(max.getString(2)).isEqualTo(BinaryString.fromString("301")); - - assertThat(min.getDouble(3)).isEqualTo(202D); - assertThat(max.getDouble(3)).isEqualTo(302D); - - assertThat(min.getInt(4)).isEqualTo(203); - assertThat(max.getInt(4)).isEqualTo(303); - - assertThat(min.getDecimal(5, 10, 2).toBigDecimal().intValue()).isEqualTo(204); - assertThat(max.getDecimal(5, 10, 2).toBigDecimal().intValue()).isEqualTo(304); - - assertThat(min.getFloat(6)).isEqualTo(205F); - assertThat(max.getFloat(6)).isEqualTo(305F); - - assertThat(min.getDouble(7)).isEqualTo(206D); - assertThat(max.getDouble(7)).isEqualTo(306D); - - assertThat(min.getDecimal(8, 10, 2).toBigDecimal().doubleValue()).isEqualTo(207D); - assertThat(max.getDecimal(8, 10, 2).toBigDecimal().doubleValue()).isEqualTo(307D); - - assertThat(min.getInt(9)).isEqualTo(208); - assertThat(max.getInt(9)).isEqualTo(308); - - assertThat(min.getInt(10)).isEqualTo(209); - assertThat(max.getInt(10)).isEqualTo(309); - - // Min and max value of binary type is null - assertThat(min.isNullAt(11)).isTrue(); - assertThat(max.isNullAt(11)).isTrue(); - } - @Override protected List getPrimaryKeyNames() { return SCHEMA_PRIMARY_KEYS; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java index 64bb5f21abbb..066bd6c57d3f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyColumnTypeFileDataTest.java @@ -65,6 +65,8 @@ public void testTableSplitFilterNormalFields() throws Exception { (files, schemas) -> { FileStoreTable table = createFileStoreTable(schemas); + // filter g: old file cannot apply filter, so the new file in the same partition + // also cannot be filtered, the result contains all data List splits = toSplits( table.newSnapshotReader() @@ -77,6 +79,9 @@ public void testTableSplitFilterNormalFields() throws Exception { List fieldGetterList = getFieldGetterList(table); assertThat(getResult(table.newRead(), splits, fieldGetterList)) .containsExactlyInAnyOrder( + "1|100|101|102.0|103|104.00|105.0|106.0|107.00|108|109|110", + "1|500|501|502.0|503|504.00|505.0|506.0|507.00|508|509|510", + "1|600|601|602.0|603|604.00|605.0|606.0|607.00|608|609|610", "2|200|201|202.0|203|204.00|205.0|206.0|207.00|208|209|210", "2|300|301|302.0|303|304.00|305.0|306.0|307.00|308|309|310", "2|400|401|402.0|403|404.00|405.0|406.0|407.00|408|409|410"); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java index 32a4138be564..80f0f7c64a82 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java @@ -105,54 +105,12 @@ public void testTableSplitFilterNormalFields() throws Exception { .between(6, 200F, 500F)) .read() .dataSplits(); - // filtered and only 3 rows left - checkFilterRowCount(toDataFileMetas(splits), 3L); + // filter g: old file cannot apply filter, so the new file in the same partition + // also cannot be filtered, the result contains all data + checkFilterRowCount(toDataFileMetas(splits), 6L); }, getPrimaryKeyNames(), tableConfig, this::createFileStoreTable); } - - /** We can only validate the values in primary keys for changelog with key table. */ - @Override - protected void validateValuesWithNewSchema( - Map tableSchemas, - long schemaId, - List filesName, - List fileMetaList) { - Function> schemaFields = - id -> tableSchemas.get(id).logicalTrimmedPrimaryKeysType().getFields(); - SimpleStatsEvolutions converters = new SimpleStatsEvolutions(schemaFields, schemaId); - for (DataFileMeta fileMeta : fileMetaList) { - SimpleStats stats = getTableValueStats(fileMeta); - SimpleStatsEvolution.Result result = - converters.getOrCreate(fileMeta.schemaId()).evolution(stats, null, null); - InternalRow min = result.minValues(); - InternalRow max = result.maxValues(); - assertThat(min.getFieldCount()).isEqualTo(4); - if (filesName.contains(fileMeta.fileName())) { - // parquet does not support padding - assertThat(min.getString(0).toString()).startsWith("200"); - assertThat(max.getString(0).toString()).startsWith("300"); - - assertThat(min.getString(1)).isEqualTo(BinaryString.fromString("201")); - assertThat(max.getString(1)).isEqualTo(BinaryString.fromString("301")); - - assertThat(min.getDouble(2)).isEqualTo(202D); - assertThat(max.getDouble(2)).isEqualTo(302D); - - assertThat(min.getInt(3)).isEqualTo(203); - assertThat(max.getInt(3)).isEqualTo(303); - } else { - assertThat(min.getString(0)) - .isEqualTo(max.getString(0)) - .isEqualTo(BinaryString.fromString("400")); - assertThat(min.getString(1)) - .isEqualTo(max.getString(1)) - .isEqualTo(BinaryString.fromString("401")); - assertThat(min.getDouble(2)).isEqualTo(max.getDouble(2)).isEqualTo(402D); - assertThat(min.getInt(3)).isEqualTo(max.getInt(3)).isEqualTo(403); - } - } - } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java index 3b12ceabe2da..73ea8f0ae119 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FilterPushdownWithSchemaChangeITCase.java @@ -230,4 +230,30 @@ public void testNumericToString() { assertThat(sql("SELECT * FROM T WHERE f = 1")).containsExactly(Row.of(1, 1)); assertThat(sql("SELECT * FROM T WHERE f <> 1")).containsExactly(Row.of(2, 111)); } + + @TestTemplate + public void testAppendStringToNumericForStatsFilter() { + sql("CREATE TABLE T (a STRING)"); + sql("INSERT INTO T VALUES ('9'), ('10'), ('11')"); + sql("ALTER TABLE T MODIFY (a INT)"); + + assertThat(sql("SELECT * FROM T WHERE a > 9")) + .containsExactlyInAnyOrder(Row.of(10), Row.of(11)); + } + + @TestTemplate + public void testPrimaryStringToNumericForStatsFilter() { + sql("CREATE TABLE T (pk STRING PRIMARY KEY NOT ENFORCED, v STRING)"); + sql("INSERT INTO T VALUES ('9', '9'), ('10', '10'), ('11', '11')"); + + // key filter + sql("ALTER TABLE T MODIFY (pk INT)"); + assertThat(sql("SELECT * FROM T WHERE pk > 9")) + .containsExactlyInAnyOrder(Row.of(10, "10"), Row.of(11, "11")); + + // value filter + sql("ALTER TABLE T MODIFY (v INT)"); + assertThat(sql("SELECT * FROM T WHERE v > 9")) + .containsExactlyInAnyOrder(Row.of(10, 10), Row.of(11, 11)); + } } From 3f059a33bcf7f8e955d8acd56701a514b87e462a Mon Sep 17 00:00:00 2001 From: yuzelin Date: Wed, 9 Jul 2025 17:14:40 +0800 Subject: [PATCH 2/6] fix --- .../apache/paimon/table/ColumnTypeFileMetaTestBase.java | 7 ------- .../table/PrimaryKeyTableColumnTypeFileMetaTest.java | 6 ------ 2 files changed, 13 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java index 746618ee0657..e92ce9b49bc9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/ColumnTypeFileMetaTestBase.java @@ -20,23 +20,16 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.data.BinaryString; -import org.apache.paimon.data.InternalRow; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; -import org.apache.paimon.schema.TableSchema; import org.apache.paimon.stats.SimpleStats; -import org.apache.paimon.stats.SimpleStatsEvolution; -import org.apache.paimon.stats.SimpleStatsEvolutions; import org.apache.paimon.table.source.DataSplit; -import org.apache.paimon.types.DataField; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.List; -import java.util.Map; -import java.util.function.Function; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java index 80f0f7c64a82..359d5e7b9dda 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyTableColumnTypeFileMetaTest.java @@ -18,25 +18,19 @@ package org.apache.paimon.table; -import org.apache.paimon.data.BinaryString; -import org.apache.paimon.data.InternalRow; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.stats.SimpleStats; -import org.apache.paimon.stats.SimpleStatsEvolution; -import org.apache.paimon.stats.SimpleStatsEvolutions; import org.apache.paimon.table.source.DataSplit; -import org.apache.paimon.types.DataField; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.List; import java.util.Map; -import java.util.function.Function; import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; From 6e4c2205d98c5f54e5092a4884d5f39c37a5e1da Mon Sep 17 00:00:00 2001 From: yuzelin Date: Mon, 21 Jul 2025 15:35:31 +0800 Subject: [PATCH 3/6] fix comments --- .../apache/paimon/io/RecordLevelExpire.java | 2 +- .../operation/AppendOnlyFileStoreScan.java | 12 ++-- .../operation/KeyValueFileStoreScan.java | 23 ++++--- .../paimon/schema/SchemaEvolutionUtil.java | 19 ++---- .../paimon/stats/SimpleStatsEvolutions.java | 35 ++++------- .../paimon/table/PrimaryKeyTableUtils.java | 6 +- .../paimon/utils/FormatReaderMapping.java | 6 +- .../schema/SchemaEvolutionUtilTest.java | 3 +- .../paimon/table/SchemaEvolutionTest.java | 60 ++++++++++++++++++- 9 files changed, 103 insertions(+), 63 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java index 6bd4df6ce3fe..9e05ed1674d8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RecordLevelExpire.java @@ -112,7 +112,7 @@ private RecordLevelExpire( fieldValueStatsConverters = new SimpleStatsEvolutions( - sid -> extractor.valueFields(scanTableSchema(sid)), schema.id(), false); + sid -> extractor.valueFields(scanTableSchema(sid)), schema.id()); } public boolean isExpireFile(DataFileMeta file) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index 7282ef745b2f..2d736241702c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -67,7 +67,7 @@ public AppendOnlyFileStoreScan( scanManifestParallelism); this.bucketSelectConverter = bucketSelectConverter; this.simpleStatsEvolutions = - new SimpleStatsEvolutions(sid -> scanTableSchema(sid).fields(), schema.id(), false); + new SimpleStatsEvolutions(sid -> scanTableSchema(sid).fields(), schema.id()); this.fileIndexReadEnabled = fileIndexReadEnabled; } @@ -80,7 +80,9 @@ public AppendOnlyFileStoreScan withFilter(Predicate predicate) { /** Note: Keep this thread-safe. */ @Override protected boolean filterByStats(ManifestEntry entry) { - if (filter == null || simpleStatsEvolutions.statsFilterUnsafe(entry, filter)) { + Predicate safeFilter = + simpleStatsEvolutions.toEvolutionSafeFilter(entry.file().schemaId(), filter); + if (safeFilter == null) { return true; } @@ -91,7 +93,7 @@ protected boolean filterByStats(ManifestEntry entry) { entry.file().rowCount(), entry.file().valueStatsCols()); - return filter.test( + return safeFilter.test( entry.file().rowCount(), stats.minValues(), stats.maxValues(), @@ -110,8 +112,8 @@ private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry dataFilterMapping.computeIfAbsent( entry.file().schemaId(), id -> - simpleStatsEvolutions.tryDevolveFilter( - entry.file().schemaId(), filter, false)); + simpleStatsEvolutions.toEvolutionSafeFilter( + entry.file().schemaId(), filter)); try (FileIndexPredicate predicate = new FileIndexPredicate(embeddedIndexBytes, dataRowType)) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index ebaa39bd98df..69ba61a3476f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -85,11 +85,11 @@ public KeyValueFileStoreScan( manifestFileFactory, scanManifestParallelism); this.bucketSelectConverter = bucketSelectConverter; + // NOTE: don't add key prefix to field names because fieldKeyStatsConverters is used for + // filter conversion this.fieldKeyStatsConverters = new SimpleStatsEvolutions( - sid -> keyValueFieldsExtractor.keyFields(scanTableSchema(sid)), - schema.id(), - true); + sid -> scanTableSchema(sid).trimmedPrimaryKeysFields(), schema.id()); this.fieldValueStatsConverters = new SimpleStatsEvolutions( sid -> keyValueFieldsExtractor.valueFields(scanTableSchema(sid)), @@ -124,7 +124,9 @@ protected boolean filterByStats(ManifestEntry entry) { return false; } - if (keyFilter == null || fieldKeyStatsConverters.statsFilterUnsafe(entry, keyFilter)) { + Predicate safeKeyFilter = + fieldKeyStatsConverters.toEvolutionSafeFilter(entry.file().schemaId(), keyFilter); + if (safeKeyFilter == null) { return true; } @@ -133,7 +135,7 @@ protected boolean filterByStats(ManifestEntry entry) { fieldKeyStatsConverters .getOrCreate(file.schemaId()) .evolution(file.keyStats(), file.rowCount(), null); - return keyFilter.test( + return safeKeyFilter.test( file.rowCount(), stats.minValues(), stats.maxValues(), stats.nullCounts()); } @@ -157,8 +159,8 @@ private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestE schemaId2DataFilter.computeIfAbsent( entry.file().schemaId(), id -> - fieldValueStatsConverters.tryDevolveFilter( - entry.file().schemaId(), valueFilter, false)); + fieldValueStatsConverters.toEvolutionSafeFilter( + entry.file().schemaId(), valueFilter)); return predicate.evaluate(dataPredicate).remain(); } catch (IOException e) { throw new RuntimeException("Exception happens while checking fileIndex predicate.", e); @@ -226,7 +228,10 @@ private boolean filterByValueFilter(ManifestEntry entry) { return ((FilteredManifestEntry) entry).selected(); } - if (fieldValueStatsConverters.statsFilterUnsafe(entry, valueFilter)) { + Predicate safeValueFilter = + fieldValueStatsConverters.toEvolutionSafeFilter( + entry.file().schemaId(), valueFilter); + if (safeValueFilter == null) { return true; } @@ -235,7 +240,7 @@ private boolean filterByValueFilter(ManifestEntry entry) { fieldValueStatsConverters .getOrCreate(file.schemaId()) .evolution(file.valueStats(), file.rowCount(), file.valueStatsCols()); - return valueFilter.test( + return safeValueFilter.test( file.rowCount(), result.minValues(), result.maxValues(), diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java index ef762a34839f..155552770c02 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java @@ -132,17 +132,11 @@ public CastFieldGetter[] getCastMapping() { * @param tableFields the table fields * @param dataFields the underlying data fields * @param filters the filters - * @param isKeyFilter whether the filter is for system _KEY_ fields - * @param nullSafe whether to return predicate if the predicate field name isn't in dataFields * @return the data filters */ @Nullable public static List devolveDataFilters( - List tableFields, - List dataFields, - List filters, - boolean isKeyFilter, - boolean nullSafe) { + List tableFields, List dataFields, List filters) { if (filters == null) { return null; } @@ -155,17 +149,16 @@ public static List devolveDataFilters( PredicateReplaceVisitor visitor = predicate -> { - String fieldName = predicate.fieldName(); - if (isKeyFilter) { - fieldName = PrimaryKeyTableUtils.addKeyNamePrefix(fieldName); - } DataField tableField = checkNotNull( - nameToTableFields.get(fieldName), + nameToTableFields.get(predicate.fieldName()), String.format("Find no field %s", predicate.fieldName())); DataField dataField = idToDataFields.get(tableField.id()); + + // For example, add field b and filter b, the filter is safe for old file + // without field b if (dataField == null) { - return nullSafe ? Optional.of(predicate) : Optional.empty(); + return Optional.of(predicate); } return CastExecutors.castLiteralsWithEvolution( diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java index 1450f38cf909..a59e1e758ad0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java +++ b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java @@ -18,8 +18,8 @@ package org.apache.paimon.stats; -import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.IndexCastMapping; import org.apache.paimon.schema.SchemaEvolutionUtil; import org.apache.paimon.types.DataField; @@ -27,7 +27,6 @@ import javax.annotation.Nullable; -import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; @@ -45,20 +44,13 @@ public class SimpleStatsEvolutions { private final List tableDataFields; private final AtomicReference> tableFields; private final ConcurrentMap evolutions; - private final boolean isKeyStats; public SimpleStatsEvolutions(Function> schemaFields, long tableSchemaId) { - this(schemaFields, tableSchemaId, false); - } - - public SimpleStatsEvolutions( - Function> schemaFields, long tableSchemaId, boolean isKeyStats) { this.schemaFields = schemaFields; this.tableSchemaId = tableSchemaId; this.tableDataFields = schemaFields.apply(tableSchemaId); this.tableFields = new AtomicReference<>(); this.evolutions = new ConcurrentHashMap<>(); - this.isKeyStats = isKeyStats; } public SimpleStatsEvolution getOrCreate(long dataSchemaId) { @@ -85,26 +77,25 @@ public SimpleStatsEvolution getOrCreate(long dataSchemaId) { }); } + /** + * If the file's schema id != current table schema id, convert the filter to evolution safe + * filter or null if can't. + */ @Nullable - public Predicate tryDevolveFilter(long dataSchemaId, Predicate filter, boolean nullSafe) { - if (tableSchemaId == dataSchemaId) { + public Predicate toEvolutionSafeFilter(long dataSchemaId, @Nullable Predicate filter) { + if (filter == null || dataSchemaId == tableSchemaId) { return filter; } + + // Filter p1 && p2, if only p1 is safe, we can return only p1 to try best filter and let the + // compute engine to perform p2. + List filters = PredicateBuilder.splitAnd(filter); List devolved = Objects.requireNonNull( SchemaEvolutionUtil.devolveDataFilters( - tableDataFields, - schemaFields.apply(dataSchemaId), - Collections.singletonList(filter), - isKeyStats, - nullSafe)); - return devolved.isEmpty() ? null : devolved.get(0); - } + tableDataFields, schemaFields.apply(dataSchemaId), filters)); - // Stats filter is unsafe if it should be devolved and the devolving is unsafe - // null safe: it's safe if the predicate field is added later and filter it on the old schema - public boolean statsFilterUnsafe(ManifestEntry entry, Predicate statsFilter) { - return tryDevolveFilter(entry.file().schemaId(), statsFilter, true) == null; + return devolved.isEmpty() ? null : PredicateBuilder.and(devolved); } public List tableDataFields() { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java index e4c502d94269..a47e44718ebe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyTableUtils.java @@ -46,14 +46,10 @@ public static RowType addKeyNamePrefix(RowType type) { public static List addKeyNamePrefix(List keyFields) { return keyFields.stream() - .map(f -> f.newName(addKeyNamePrefix(f.name())).newId(f.id() + KEY_FIELD_ID_START)) + .map(f -> f.newName(KEY_FIELD_PREFIX + f.name()).newId(f.id() + KEY_FIELD_ID_START)) .collect(Collectors.toList()); } - public static String addKeyNamePrefix(String keyName) { - return KEY_FIELD_PREFIX + keyName; - } - public static MergeFunctionFactory createMergeFunctionFactory( TableSchema tableSchema, KeyValueFieldsExtractor extractor) { RowType rowType = tableSchema.logicalRowType(); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java index c7529b9a8a7b..8de9bf8508fa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java @@ -309,11 +309,7 @@ private List readFilters( tableSchema.id() == dataSchema.id() ? filters : SchemaEvolutionUtil.devolveDataFilters( - tableSchema.fields(), - dataSchema.fields(), - filters, - false, - false); + tableSchema.fields(), dataSchema.fields(), filters); // Skip pushing down partition filters to reader. return excludePredicateWithFields( diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java index 033db6948159..291ad0ef4fc0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java @@ -91,8 +91,7 @@ public void testDevolveDataFilters() { IsNull.INSTANCE, DataTypes.INT(), 7, "a", Collections.emptyList())); List filters = - SchemaEvolutionUtil.devolveDataFilters( - tableFields2, dataFields, predicates, false, false); + SchemaEvolutionUtil.devolveDataFilters(tableFields2, dataFields, predicates); assert filters != null; assertThat(filters.size()).isEqualTo(1); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java index 5d20e46ec274..e237c819c464 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java @@ -21,6 +21,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.DataFormatTestUtil; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; @@ -34,6 +35,7 @@ import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.sink.StreamTableWrite; import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.InnerTableRead; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.snapshot.SnapshotReader; @@ -56,6 +58,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.function.Consumer; @@ -405,6 +408,59 @@ public void testCreateAlterSystemField() throws Exception { "_VALUE_KIND", SYSTEM_FIELD_NAMES))); } + @Test + public void testPushDownEvolutionSafeFilter() throws Exception { + Schema schema = + new Schema( + RowType.of(DataTypes.INT(), DataTypes.BIGINT()).getFields(), + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonMap("write-only", "true"), + ""); + schemaManager.createTable(schema); + + FileStoreTable table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath); + + try (StreamTableWrite write = table.newWrite(commitUser); + TableCommitImpl commit = table.newCommit(commitUser)) { + // file 1 + write.write(GenericRow.of(1, 1L)); + commit.commit(1, write.prepareCommit(false, 1)); + + // file 2 + write.write(GenericRow.of(2, 2L)); + commit.commit(2, write.prepareCommit(false, 2)); + } + + schemaManager.commitChanges( + Collections.singletonList(SchemaChange.updateColumnType("f0", DataTypes.STRING()))); + table = FileStoreTableFactory.create(LocalFileIO.create(), tablePath); + + try (StreamTableWrite write = table.newWrite(commitUser); + TableCommitImpl commit = table.newCommit(commitUser)) { + // file 3 + write.write(GenericRow.of(BinaryString.fromString("0"), 3L)); + commit.commit(3, write.prepareCommit(false, 3)); + + // file 4 + write.write(GenericRow.of(BinaryString.fromString("3"), 3L)); + commit.commit(4, write.prepareCommit(false, 4)); + } + + PredicateBuilder builder = new PredicateBuilder(table.schema().logicalRowType()); + // p: f0 >= '3' && f1 >= 2L + Predicate p = + PredicateBuilder.and( + builder.greaterOrEqual(0, BinaryString.fromString("3")), + builder.greaterOrEqual(1, 2L)); + // file 1 will be filtered by f1 >= 2L + // file 2 won't be filtered because f0 >= '3' is not safe + // file 3 will be filtered by f0 >= '3' + // file 4 won't be filtered + List rows = readRecords(table, p); + assertThat(rows).containsExactlyInAnyOrder("2, 2", "3, 3"); + } + private List readRecords(FileStoreTable table, Predicate filter) throws IOException { List results = new ArrayList<>(); forEachRemaining( @@ -423,7 +479,9 @@ private void forEachRemaining( if (filter != null) { snapshotReader.withFilter(filter); } - for (Split split : snapshotReader.read().dataSplits()) { + List dataSplits = snapshotReader.read().dataSplits(); + + for (Split split : dataSplits) { InnerTableRead read = table.newRead(); if (filter != null) { read.withFilter(filter); From a838e5c271b57325f0be8997ac9a6414ca9054a7 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Mon, 21 Jul 2025 19:25:30 +0800 Subject: [PATCH 4/6] fix --- .../main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java | 1 - .../test/java/org/apache/paimon/table/SchemaEvolutionTest.java | 1 - 2 files changed, 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java index 155552770c02..6cc68fd6a2c6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java @@ -31,7 +31,6 @@ import org.apache.paimon.predicate.LeafPredicate; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.predicate.PredicateReplaceVisitor; -import org.apache.paimon.table.PrimaryKeyTableUtils; import org.apache.paimon.types.ArrayType; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java index e237c819c464..24955dc94a5a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java @@ -58,7 +58,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.function.Consumer; From 59aa1e1ea6fb59ea1967f1254028b563dcf1741b Mon Sep 17 00:00:00 2001 From: yuzelin Date: Mon, 21 Jul 2025 19:58:20 +0800 Subject: [PATCH 5/6] fix 1 --- .../paimon/schema/SchemaEvolutionUtilTest.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java index 291ad0ef4fc0..4d4f7ddb6d71 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java @@ -92,12 +92,22 @@ public void testDevolveDataFilters() { List filters = SchemaEvolutionUtil.devolveDataFilters(tableFields2, dataFields, predicates); - assert filters != null; - assertThat(filters.size()).isEqualTo(1); + assertThat(filters).isNotNull(); + assertThat(filters.size()).isEqualTo(3); LeafPredicate child1 = (LeafPredicate) filters.get(0); assertThat(child1.function()).isEqualTo(IsNull.INSTANCE); assertThat(child1.fieldName()).isEqualTo("b"); assertThat(child1.index()).isEqualTo(1); + + LeafPredicate child2 = (LeafPredicate) filters.get(1); + assertThat(child2.function()).isEqualTo(IsNotNull.INSTANCE); + assertThat(child2.fieldName()).isEqualTo("e"); + assertThat(child2.index()).isEqualTo(9); + + LeafPredicate child3 = (LeafPredicate) filters.get(2); + assertThat(child3.function()).isEqualTo(IsNull.INSTANCE); + assertThat(child3.fieldName()).isEqualTo("a"); + assertThat(child3.index()).isEqualTo(7); } } From 95053247e9e742252614d87316c540d33edd28c1 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Mon, 21 Jul 2025 23:00:56 +0800 Subject: [PATCH 6/6] fix 2 --- .../operation/AppendOnlyFileStoreScan.java | 4 +- .../operation/KeyValueFileStoreScan.java | 7 +-- .../paimon/schema/SchemaEvolutionUtil.java | 51 +++++++++++++------ .../paimon/stats/SimpleStatsEvolutions.java | 6 +-- .../paimon/utils/FormatReaderMapping.java | 4 +- .../java/org/apache/paimon/TestFileStore.java | 9 +++- .../schema/SchemaEvolutionUtilTest.java | 14 +---- 7 files changed, 56 insertions(+), 39 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java index 2d736241702c..0cf3dbdb69ac 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java @@ -81,7 +81,7 @@ public AppendOnlyFileStoreScan withFilter(Predicate predicate) { @Override protected boolean filterByStats(ManifestEntry entry) { Predicate safeFilter = - simpleStatsEvolutions.toEvolutionSafeFilter(entry.file().schemaId(), filter); + simpleStatsEvolutions.toEvolutionSafeStatsFilter(entry.file().schemaId(), filter); if (safeFilter == null) { return true; } @@ -112,7 +112,7 @@ private boolean testFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestEntry dataFilterMapping.computeIfAbsent( entry.file().schemaId(), id -> - simpleStatsEvolutions.toEvolutionSafeFilter( + simpleStatsEvolutions.toEvolutionSafeStatsFilter( entry.file().schemaId(), filter)); try (FileIndexPredicate predicate = diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java index 69ba61a3476f..a1b364679b2d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java @@ -125,7 +125,8 @@ protected boolean filterByStats(ManifestEntry entry) { } Predicate safeKeyFilter = - fieldKeyStatsConverters.toEvolutionSafeFilter(entry.file().schemaId(), keyFilter); + fieldKeyStatsConverters.toEvolutionSafeStatsFilter( + entry.file().schemaId(), keyFilter); if (safeKeyFilter == null) { return true; } @@ -159,7 +160,7 @@ private boolean filterByFileIndex(@Nullable byte[] embeddedIndexBytes, ManifestE schemaId2DataFilter.computeIfAbsent( entry.file().schemaId(), id -> - fieldValueStatsConverters.toEvolutionSafeFilter( + fieldValueStatsConverters.toEvolutionSafeStatsFilter( entry.file().schemaId(), valueFilter)); return predicate.evaluate(dataPredicate).remain(); } catch (IOException e) { @@ -229,7 +230,7 @@ private boolean filterByValueFilter(ManifestEntry entry) { } Predicate safeValueFilter = - fieldValueStatsConverters.toEvolutionSafeFilter( + fieldValueStatsConverters.toEvolutionSafeStatsFilter( entry.file().schemaId(), valueFilter); if (safeValueFilter == null) { return true; diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java index 6cc68fd6a2c6..e7e4831386f0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaEvolutionUtil.java @@ -131,11 +131,16 @@ public CastFieldGetter[] getCastMapping() { * @param tableFields the table fields * @param dataFields the underlying data fields * @param filters the filters + * @param forData true if devolve the filters for filtering data file, otherwise, for filtering + * manifest entry * @return the data filters */ @Nullable - public static List devolveDataFilters( - List tableFields, List dataFields, List filters) { + public static List devolveFilters( + List tableFields, + List dataFields, + List filters, + boolean forData) { if (filters == null) { return null; } @@ -153,23 +158,37 @@ public static List devolveDataFilters( nameToTableFields.get(predicate.fieldName()), String.format("Find no field %s", predicate.fieldName())); DataField dataField = idToDataFields.get(tableField.id()); - - // For example, add field b and filter b, the filter is safe for old file - // without field b if (dataField == null) { - return Optional.of(predicate); + // For example, add field b and filter b, the filter is safe for old file + // meta without field b because the index mapping array can handle null + return forData ? Optional.empty() : Optional.of(predicate); } - return CastExecutors.castLiteralsWithEvolution( - predicate.literals(), predicate.type(), dataField.type()) - .map( - literals -> - new LeafPredicate( - predicate.function(), - dataField.type(), - indexOf(dataField, idToDataFields), - dataField.name(), - literals)); + Optional> castedLiterals = + CastExecutors.castLiteralsWithEvolution( + predicate.literals(), predicate.type(), dataField.type()); + + // unsafe + if (!castedLiterals.isPresent()) { + return Optional.empty(); + } + + if (forData) { + // For data, the filter will be pushdown to data file, so must use the index + // and literal type of data file + return Optional.of( + new LeafPredicate( + predicate.function(), + dataField.type(), + indexOf(dataField, idToDataFields), + dataField.name(), + castedLiterals.get())); + } else { + // For meta, the index mapping array will map the index the cast the + // literals, so just return self + // In other words, return it if it's safe + return Optional.of(predicate); + } }; for (Predicate predicate : filters) { diff --git a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java index a59e1e758ad0..eb44ea706c89 100644 --- a/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java +++ b/paimon-core/src/main/java/org/apache/paimon/stats/SimpleStatsEvolutions.java @@ -82,7 +82,7 @@ public SimpleStatsEvolution getOrCreate(long dataSchemaId) { * filter or null if can't. */ @Nullable - public Predicate toEvolutionSafeFilter(long dataSchemaId, @Nullable Predicate filter) { + public Predicate toEvolutionSafeStatsFilter(long dataSchemaId, @Nullable Predicate filter) { if (filter == null || dataSchemaId == tableSchemaId) { return filter; } @@ -92,8 +92,8 @@ public Predicate toEvolutionSafeFilter(long dataSchemaId, @Nullable Predicate fi List filters = PredicateBuilder.splitAnd(filter); List devolved = Objects.requireNonNull( - SchemaEvolutionUtil.devolveDataFilters( - tableDataFields, schemaFields.apply(dataSchemaId), filters)); + SchemaEvolutionUtil.devolveFilters( + tableDataFields, schemaFields.apply(dataSchemaId), filters, false)); return devolved.isEmpty() ? null : PredicateBuilder.and(devolved); } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java index 8de9bf8508fa..1c8997174223 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/FormatReaderMapping.java @@ -308,8 +308,8 @@ private List readFilters( List dataFilters = tableSchema.id() == dataSchema.id() ? filters - : SchemaEvolutionUtil.devolveDataFilters( - tableSchema.fields(), dataSchema.fields(), filters); + : SchemaEvolutionUtil.devolveFilters( + tableSchema.fields(), dataSchema.fields(), filters, true); // Skip pushing down partition filters to reader. return excludePredicateWithFields( diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index efe4c1f62c92..7d303cad2dd7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -53,6 +53,7 @@ import org.apache.paimon.table.ExpireChangelogImpl; import org.apache.paimon.table.ExpireSnapshots; import org.apache.paimon.table.ExpireSnapshotsImpl; +import org.apache.paimon.table.SpecialFields; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ScanMode; @@ -126,7 +127,7 @@ private TestFileStore( valueType.getFields(), valueType.getFieldCount(), partitionType.getFieldNames(), - keyType.getFieldNames(), + cleanPrimaryKeys(keyType.getFieldNames()), Collections.emptyMap(), null), false, @@ -148,6 +149,12 @@ private TestFileStore( this.commitIdentifier = 0L; } + private static List cleanPrimaryKeys(List primaryKeys) { + return primaryKeys.stream() + .map(k -> k.substring(SpecialFields.KEY_FIELD_PREFIX.length())) + .collect(Collectors.toList()); + } + private static SchemaManager schemaManager(String root, CoreOptions options) { return new SchemaManager(FileIOFinder.find(new Path(root)), options.path()); } diff --git a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java index 4d4f7ddb6d71..2883b36b5c5d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaEvolutionUtilTest.java @@ -91,23 +91,13 @@ public void testDevolveDataFilters() { IsNull.INSTANCE, DataTypes.INT(), 7, "a", Collections.emptyList())); List filters = - SchemaEvolutionUtil.devolveDataFilters(tableFields2, dataFields, predicates); + SchemaEvolutionUtil.devolveFilters(tableFields2, dataFields, predicates, true); assertThat(filters).isNotNull(); - assertThat(filters.size()).isEqualTo(3); + assertThat(filters.size()).isEqualTo(1); LeafPredicate child1 = (LeafPredicate) filters.get(0); assertThat(child1.function()).isEqualTo(IsNull.INSTANCE); assertThat(child1.fieldName()).isEqualTo("b"); assertThat(child1.index()).isEqualTo(1); - - LeafPredicate child2 = (LeafPredicate) filters.get(1); - assertThat(child2.function()).isEqualTo(IsNotNull.INSTANCE); - assertThat(child2.fieldName()).isEqualTo("e"); - assertThat(child2.index()).isEqualTo(9); - - LeafPredicate child3 = (LeafPredicate) filters.get(2); - assertThat(child3.function()).isEqualTo(IsNull.INSTANCE); - assertThat(child3.fieldName()).isEqualTo("a"); - assertThat(child3.index()).isEqualTo(7); } }