From 405aa063f91652c5a00e910cf16af303edbe52b0 Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Wed, 10 Sep 2025 21:20:55 +0800 Subject: [PATCH] [core] Introduce 'ignore-update-before' to ignore UD only --- .../generated/core_configuration.html | 6 ++ .../java/org/apache/paimon/CoreOptions.java | 11 ++++ .../apache/paimon/utils/RowKindFilter.java | 64 +++++++++++++++++++ .../table/AppendOnlyFileStoreTable.java | 3 +- .../table/PrimaryKeyFileStoreTable.java | 3 +- .../paimon/table/sink/TableWriteImpl.java | 9 +-- .../paimon/flink/sink/LocalMergeOperator.java | 9 ++- .../paimon/flink/BatchFileStoreITCase.java | 18 +++++- .../flink/sink/LocalMergeOperatorTest.java | 47 ++++++++++++++ 9 files changed, 160 insertions(+), 10 deletions(-) create mode 100644 paimon-api/src/main/java/org/apache/paimon/utils/RowKindFilter.java diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 4273751caf0a..2ca6b6c7bc3f 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -518,6 +518,12 @@ Boolean Whether to ignore delete records. + +
ignore-update-before
+ false + Boolean + Whether to ignore update-before records. +
incremental-between
(none) diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java index 05ff57a617d2..e2e141f141af 100644 --- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java @@ -489,6 +489,13 @@ public InlineElement getDescription() { "partial-update.ignore-delete") .withDescription("Whether to ignore delete records."); + @Immutable + public static final ConfigOption IGNORE_UPDATE_BEFORE = + key("ignore-update-before") + .booleanType() + .defaultValue(false) + .withDescription("Whether to ignore update-before records."); + public static final ConfigOption SORT_ENGINE = key("sort-engine") .enumType(SortEngine.class) @@ -2288,6 +2295,10 @@ public boolean ignoreDelete() { return options.get(IGNORE_DELETE); } + public boolean ignoreUpdateBefore() { + return options.get(IGNORE_UPDATE_BEFORE); + } + public SortEngine sortEngine() { return options.get(SORT_ENGINE); } diff --git a/paimon-api/src/main/java/org/apache/paimon/utils/RowKindFilter.java b/paimon-api/src/main/java/org/apache/paimon/utils/RowKindFilter.java new file mode 100644 index 000000000000..fb6372ae0c10 --- /dev/null +++ b/paimon-api/src/main/java/org/apache/paimon/utils/RowKindFilter.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.types.RowKind; + +import javax.annotation.Nullable; + +/** A class to filter row kinds. */ +public class RowKindFilter { + + private final boolean ignoreAllRetracts; + private final boolean ignoreUpdateBefore; + + public RowKindFilter(boolean ignoreAllRetracts, boolean ignoreUpdateBefore) { + this.ignoreAllRetracts = ignoreAllRetracts; + this.ignoreUpdateBefore = ignoreUpdateBefore; + } + + @Nullable + public static RowKindFilter of(CoreOptions options) { + boolean ignoreAllRetracts = options.ignoreDelete(); + boolean ignoreUpdateBefore = options.ignoreUpdateBefore(); + if (!ignoreAllRetracts && !ignoreUpdateBefore) { + return null; + } + return new RowKindFilter(ignoreAllRetracts, ignoreUpdateBefore); + } + + public boolean test(RowKind rowKind) { + switch (rowKind) { + case DELETE: + if (ignoreAllRetracts) { + return false; + } + break; + case UPDATE_BEFORE: + if (ignoreUpdateBefore || ignoreAllRetracts) { + return false; + } + break; + default: + break; + } + return true; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java index 367eeb714b9a..4ac41361accb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java @@ -40,6 +40,7 @@ import org.apache.paimon.table.source.splitread.SplitReadConfig; import org.apache.paimon.table.source.splitread.SplitReadProvider; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.RowKindFilter; import javax.annotation.Nullable; @@ -139,7 +140,7 @@ public TableWriteImpl newWrite(String commitUser, @Nullable Integer return record.row(); }, rowKindGenerator(), - CoreOptions.fromMap(tableSchema.options()).ignoreDelete()); + RowKindFilter.of(coreOptions())); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java index b17f307f8d01..62631aaf8053 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java @@ -38,6 +38,7 @@ import org.apache.paimon.table.source.MergeTreeSplitGenerator; import org.apache.paimon.table.source.SplitGenerator; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.RowKindFilter; import javax.annotation.Nullable; @@ -170,7 +171,7 @@ public TableWriteImpl newWrite(String commitUser, @Nullable Integer wr rowKind, record.row()), rowKindGenerator(), - CoreOptions.fromMap(tableSchema.options()).ignoreDelete()); + RowKindFilter.of(coreOptions())); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java index ac1499c75dc9..99e30bcccf8d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java @@ -38,6 +38,7 @@ import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Restorable; +import org.apache.paimon.utils.RowKindFilter; import javax.annotation.Nullable; @@ -59,7 +60,7 @@ public class TableWriteImpl implements InnerTableWrite, Restorable keyAndBucketExtractor; private final RecordExtractor recordExtractor; @Nullable private final RowKindGenerator rowKindGenerator; - private final boolean ignoreDelete; + @Nullable private final RowKindFilter rowKindFilter; private boolean batchCommitted = false; private BucketMode bucketMode; @@ -73,13 +74,13 @@ public TableWriteImpl( KeyAndBucketExtractor keyAndBucketExtractor, RecordExtractor recordExtractor, @Nullable RowKindGenerator rowKindGenerator, - boolean ignoreDelete) { + @Nullable RowKindFilter rowKindFilter) { this.rowType = rowType; this.write = write; this.keyAndBucketExtractor = keyAndBucketExtractor; this.recordExtractor = recordExtractor; this.rowKindGenerator = rowKindGenerator; - this.ignoreDelete = ignoreDelete; + this.rowKindFilter = rowKindFilter; List notNullColumnNames = rowType.getFields().stream() @@ -183,7 +184,7 @@ public SinkRecord writeAndReturn(InternalRow row, int bucket) throws Exception { checkNullability(row); row = wrapDefaultValue(row); RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row); - if (ignoreDelete && rowKind.isRetract()) { + if (rowKindFilter != null && !rowKindFilter.test(rowKind)) { return null; } SinkRecord record = bucket == -1 ? toSinkRecord(row) : toSinkRecord(row, bucket); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java index 070262147643..bb48dbf36317 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/LocalMergeOperator.java @@ -41,6 +41,7 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.KeyComparatorSupplier; import org.apache.paimon.utils.Preconditions; +import org.apache.paimon.utils.RowKindFilter; import org.apache.paimon.utils.UserDefinedSeqComparator; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -56,6 +57,8 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import javax.annotation.Nullable; + import java.util.List; import static org.apache.paimon.table.PrimaryKeyTableUtils.addKeyNamePrefix; @@ -70,7 +73,7 @@ public class LocalMergeOperator extends AbstractStreamOperator private static final long serialVersionUID = 1L; private final TableSchema schema; - private final boolean ignoreDelete; + private final @Nullable RowKindFilter rowKindFilter; private transient Projection keyProjection; @@ -87,7 +90,7 @@ private LocalMergeOperator( schema.primaryKeys().size() > 0, "LocalMergeOperator currently only support tables with primary keys"); this.schema = schema; - this.ignoreDelete = CoreOptions.fromMap(schema.options()).ignoreDelete(); + this.rowKindFilter = RowKindFilter.of(CoreOptions.fromMap(schema.options())); setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); } @@ -170,7 +173,7 @@ public void processElement(StreamRecord record) throws Exception { InternalRow row = record.getValue(); RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row); - if (ignoreDelete && rowKind.isRetract()) { + if (rowKindFilter != null && !rowKindFilter.test(rowKind)) { return; } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 066eeb31cac6..13425014f5e1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -619,7 +619,7 @@ public void testIgnoreDelete() { public void testIgnoreDeleteWithRowKindField() { sql( "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING, kind STRING) " - + "WITH ('merge-engine' = 'deduplicate', 'ignore-delete' = 'true', 'bucket' = '1', 'rowkind.field' = 'kind')"); + + "WITH ('ignore-delete' = 'true', 'bucket' = '1', 'rowkind.field' = 'kind')"); sql("INSERT INTO ignore_delete VALUES (1, 'A', '+I')"); assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "A", "+I")); @@ -631,6 +631,22 @@ public void testIgnoreDeleteWithRowKindField() { assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "B", "+I")); } + @Test + public void testIgnoreUpdateBeforeWithRowKindField() { + sql( + "CREATE TABLE ignore_delete (pk INT PRIMARY KEY NOT ENFORCED, v STRING, kind STRING) " + + "WITH ('ignore-update-before' = 'true', 'bucket' = '1', 'rowkind.field' = 'kind')"); + + sql("INSERT INTO ignore_delete VALUES (1, 'A', '+I')"); + assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "A", "+I")); + + sql("INSERT INTO ignore_delete VALUES (1, 'A', '-U')"); + assertThat(sql("SELECT * FROM ignore_delete")).containsExactly(Row.of(1, "A", "+I")); + + sql("INSERT INTO ignore_delete VALUES (1, 'A', '-D')"); + assertThat(sql("SELECT * FROM ignore_delete")).isEmpty(); + } + @Test public void testDeleteWithPkLookup() throws Exception { sql( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java index 08fe8cb2386f..eba09e12e59d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/LocalMergeOperatorTest.java @@ -50,10 +50,13 @@ import java.util.Random; import java.util.function.Consumer; +import static org.apache.paimon.CoreOptions.IGNORE_DELETE; +import static org.apache.paimon.CoreOptions.IGNORE_UPDATE_BEFORE; import static org.apache.paimon.CoreOptions.LOCAL_MERGE_BUFFER_SIZE; import static org.apache.paimon.CoreOptions.SEQUENCE_FIELD; import static org.apache.paimon.data.BinaryString.fromString; import static org.apache.paimon.types.RowKind.DELETE; +import static org.apache.paimon.types.RowKind.UPDATE_BEFORE; import static org.assertj.core.api.Assertions.assertThat; class LocalMergeOperatorTest { @@ -114,6 +117,50 @@ public void testUserDefineSequence() throws Exception { result.clear(); } + @Test + public void testIgnoreUpdateBefore() throws Exception { + Map options = new HashMap<>(); + options.put(IGNORE_UPDATE_BEFORE.key(), "true"); + prepareHashOperator(options); + + List result = new ArrayList<>(); + setOutput(result); + + processElement("a", 1); + processElement(UPDATE_BEFORE, "a", 1); + operator.prepareSnapshotPreBarrier(0); + assertThat(result).containsExactlyInAnyOrder("+I:a->1"); + result.clear(); + + processElement("a", 1); + processElement(DELETE, "a", 1); + operator.prepareSnapshotPreBarrier(1); + assertThat(result).containsExactlyInAnyOrder("-D:a->1"); + result.clear(); + } + + @Test + public void testIgnoreDelete() throws Exception { + Map options = new HashMap<>(); + options.put(IGNORE_DELETE.key(), "true"); + prepareHashOperator(options); + + List result = new ArrayList<>(); + setOutput(result); + + processElement("a", 1); + processElement(UPDATE_BEFORE, "a", 1); + operator.prepareSnapshotPreBarrier(0); + assertThat(result).containsExactlyInAnyOrder("+I:a->1"); + result.clear(); + + processElement("a", 1); + processElement(DELETE, "a", 1); + operator.prepareSnapshotPreBarrier(1); + assertThat(result).containsExactlyInAnyOrder("+I:a->1"); + result.clear(); + } + @Test public void testHashSpill() throws Exception { Map options = new HashMap<>();