From 3311a2773279947d7cb9201cad46403e60e84208 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Wed, 8 Jan 2025 13:46:04 +0800 Subject: [PATCH] [flink] Flink batch delete supports partial-update.remove-record-on-sequence-group option --- .../java/org/apache/paimon/CoreOptions.java | 4 -- ...pportsRowLevelOperationFlinkTableSink.java | 37 ++++++++++++----- .../paimon/flink/PartialUpdateITCase.java | 40 ++++++++++++++++++- 3 files changed, 66 insertions(+), 15 deletions(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index acf082177bc0..0158bdcee8fd 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -2201,10 +2201,6 @@ public boolean sequenceFieldSortOrderIsAscending() { return options.get(SEQUENCE_FIELD_SORT_ORDER) == SortOrder.ASCENDING; } - public boolean partialUpdateRemoveRecordOnDelete() { - return options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE); - } - public Optional rowkindField() { return options.getOptional(ROWKIND_FIELD); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java index 4e4c2ff2c67f..c0d19abff213 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java @@ -58,7 +58,10 @@ import static org.apache.paimon.CoreOptions.MERGE_ENGINE; import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE; import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE; +import static org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE; +import static org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP; import static org.apache.paimon.CoreOptions.createCommitUser; +import static org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP; import static org.apache.paimon.utils.Preconditions.checkArgument; /** Flink table sink that supports row level update and delete. */ @@ -185,17 +188,31 @@ private void validateDeletable() { table.getClass().getName())); } - CoreOptions coreOptions = CoreOptions.fromMap(table.options()); - if (coreOptions.mergeEngine() == DEDUPLICATE - || (coreOptions.mergeEngine() == PARTIAL_UPDATE - && coreOptions.partialUpdateRemoveRecordOnDelete())) { - return; - } + Options options = Options.fromMap(table.options()); + MergeEngine mergeEngine = options.get(MERGE_ENGINE); - throw new UnsupportedOperationException( - String.format( - "Merge engine %s can not support batch delete.", - coreOptions.mergeEngine())); + switch (mergeEngine) { + case DEDUPLICATE: + return; + case PARTIAL_UPDATE: + if (options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE) + || options.get(PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP) != null) { + return; + } else { + throw new UnsupportedOperationException( + String.format( + "Merge engine %s doesn't support batch delete by default. To support batch delete, " + + "please set %s to true when there is no %s or set %s.", + mergeEngine, + PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE.key(), + SEQUENCE_GROUP, + PARTIAL_UPDATE_REMOVE_RECORD_ON_SEQUENCE_GROUP)); + } + default: + throw new UnsupportedOperationException( + String.format( + "Merge engine %s can not support batch delete.", mergeEngine)); + } } private boolean canPushDownDeleteFilter() { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java index 76ee8309e8b5..be2d6b34337c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java @@ -623,7 +623,7 @@ public void testIgnoreDelete(boolean localMerge) throws Exception { } @Test - public void testRemoveRecordOnDelete() { + public void testRemoveRecordOnDeleteWithoutSequenceGroup() { sql( "CREATE TABLE remove_record_on_delete (pk INT PRIMARY KEY NOT ENFORCED, a STRING, b STRING) WITH (" + " 'merge-engine' = 'partial-update'," @@ -647,6 +647,44 @@ public void testRemoveRecordOnDelete() { .containsExactlyInAnyOrder(Row.of(1, "A", "apache")); } + @Test + public void testRemoveRecordOnDeleteWithSequenceGroup() throws Exception { + sql( + "CREATE TABLE remove_record_on_delete_sequence_group" + + " (pk INT PRIMARY KEY NOT ENFORCED, a STRING, seq_a INT, b STRING, seq_b INT) WITH (" + + " 'merge-engine' = 'partial-update'," + + " 'fields.seq_a.sequence-group' = 'a'," + + " 'fields.seq_b.sequence-group' = 'b'," + + " 'partial-update.remove-record-on-sequence-group' = 'seq_a'" + + ")"); + + sql("INSERT INTO remove_record_on_delete_sequence_group VALUES (1, 'apple', 2, 'a', 1)"); + sql("INSERT INTO remove_record_on_delete_sequence_group VALUES (1, 'banana', 1, 'b', 2)"); + assertThat(sql("SELECT * FROM remove_record_on_delete_sequence_group")) + .containsExactlyInAnyOrder(Row.of(1, "apple", 2, "b", 2)); + + // delete with seq_b won't delete record but retract b + String id = + TestValuesTableFactory.registerData( + Collections.singletonList( + Row.ofKind(RowKind.DELETE, 1, null, null, "b", 2))); + sEnv.executeSql( + String.format( + "CREATE TEMPORARY TABLE delete_source1 (pk INT, a STRING, seq_a INT, b STRING, seq_b INT) " + + "WITH ('connector'='values', 'bounded'='true', 'data-id'='%s', " + + "'changelog-mode' = 'I,D,UA,UB')", + id)); + sEnv.executeSql( + "INSERT INTO remove_record_on_delete_sequence_group SELECT * FROM delete_source1") + .await(); + assertThat(sql("SELECT * FROM remove_record_on_delete_sequence_group")) + .containsExactlyInAnyOrder(Row.of(1, "apple", 2, null, 2)); + + // delete record + sql("DELETE FROM remove_record_on_delete_sequence_group WHERE pk = 1"); + assertThat(sql("SELECT * FROM remove_record_on_delete_sequence_group")).isEmpty(); + } + @Test public void testRemoveRecordOnDeleteLookup() throws Exception { sql(