From 020ce6d6698663db615ae145763b3835e3d5b8ef Mon Sep 17 00:00:00 2001 From: "chenxinwei.bugfree" Date: Mon, 13 Jan 2025 13:02:29 +0800 Subject: [PATCH 1/8] [core] Introduce ClearConsumersProcedure to clear consumers Change-Id: I05d02f5c48c82d633436260f1ca5f6d0e9ddfcc4 --- .../paimon/consumer/ConsumerManager.java | 22 +++ .../procedure/ClearConsumersProcedure.java | 97 +++++++++++ .../procedure/ClearConsumersProcedure.java | 97 +++++++++++ .../org.apache.paimon.factories.Factory | 1 + .../flink/action/ConsumerActionITCase.java | 155 ++++++++++++++++++ .../apache/paimon/spark/SparkProcedures.java | 2 + .../procedure/ClearConsumersProcedure.java | 125 ++++++++++++++ 7 files changed, 499 insertions(+) create mode 100644 paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java diff --git a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java index 839143b68f93..42270b254f5d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java @@ -107,6 +107,28 @@ public void expire(LocalDateTime expireDateTime) { } } + /** Clear consumers. */ + public void clearConsumers(List consumerIds, Boolean clearUnspecified) { + try { + listVersionedFileStatus(fileIO, consumerDirectory(), CONSUMER_PREFIX) + .forEach( + status -> { + String consumerName = + status.getPath() + .getName() + .substring(CONSUMER_PREFIX.length()); + if (consumerIds == null + || (!clearUnspecified && consumerIds.contains(consumerName)) + || (clearUnspecified + && !consumerIds.contains(consumerName))) { + fileIO.deleteQuietly(status.getPath()); + } + }); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + /** Get all consumer. */ public Map consumers() throws IOException { Map consumers = new HashMap<>(); diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java new file mode 100644 index 000000000000..2d3e6b2b7ed4 --- /dev/null +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.consumer.ConsumerManager; +import org.apache.paimon.table.FileStoreTable; + +import org.apache.flink.table.procedure.ProcedureContext; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +/** + * Clear consumers procedure. Usage: + * + *

+ *  -- clear all consumers except the specified consumer in the table
+ *  CALL sys.clear_consumers('tableId', 'consumerIds', true)
+ *
+ * -- clear all specified consumers in the table
+ *  CALL sys.clear_consumers('tableId', 'consumerIds') or CALL sys.clear_consumers('tableId', 'consumerIds', false)
+ *
+ *  -- clear all consumers in the table
+ *  CALL sys.clear_unspecified_consumers('tableId')
+ * 
+ */ +public class ClearConsumersProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "clear_consumers"; + + public String[] call( + ProcedureContext procedureContext, + String tableId, + String consumerIds, + Boolean clearUnspecified) + throws Catalog.TableNotExistException { + FileStoreTable fileStoreTable = + (FileStoreTable) catalog.getTable(Identifier.fromString(tableId)); + ConsumerManager consumerManager = + new ConsumerManager( + fileStoreTable.fileIO(), + fileStoreTable.location(), + fileStoreTable.snapshotManager().branch()); + List specifiedConsumerIds = + Optional.of(consumerIds) + .map(s -> Arrays.asList(s.split(","))) + .orElse(Collections.emptyList()); + consumerManager.clearConsumers( + specifiedConsumerIds, Optional.of(clearUnspecified).orElse(false)); + + return new String[] {"Success"}; + } + + public String[] call(ProcedureContext procedureContext, String tableId, String consumerIds) + throws Catalog.TableNotExistException { + return call(procedureContext, tableId, consumerIds, false); + } + + public String[] call(ProcedureContext procedureContext, String tableId) + throws Catalog.TableNotExistException { + FileStoreTable fileStoreTable = + (FileStoreTable) catalog.getTable(Identifier.fromString(tableId)); + ConsumerManager consumerManager = + new ConsumerManager( + fileStoreTable.fileIO(), + fileStoreTable.location(), + fileStoreTable.snapshotManager().branch()); + consumerManager.clearConsumers(null, null); + + return new String[] {"Success"}; + } + + @Override + public String identifier() { + return IDENTIFIER; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java new file mode 100644 index 000000000000..a7869a237a0e --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.consumer.ConsumerManager; +import org.apache.paimon.table.FileStoreTable; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +/** + * Clear consumers procedure. Usage: + * + *

+ *  -- clear all consumers except the specified consumer in the table
+ *  CALL sys.clear_consumers('tableId', 'consumerIds', true)
+ *
+ * -- clear all specified consumers in the table
+ *  CALL sys.clear_consumers('tableId', 'consumerIds') or CALL sys.clear_consumers('tableId', 'consumerIds', false)
+ *
+ *  -- clear all consumers in the table
+ *  CALL sys.clear_unspecified_consumers('tableId')
+ * 
+ */ +public class ClearConsumersProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "clear_consumers"; + + @ProcedureHint( + argument = { + @ArgumentHint(name = "table", type = @DataTypeHint("STRING")), + @ArgumentHint( + name = "consumer_ids", + type = @DataTypeHint("STRING"), + isOptional = true), + @ArgumentHint( + name = "clear_unspecified", + type = @DataTypeHint("BOOLEAN"), + isOptional = true) + }) + public String[] call( + ProcedureContext procedureContext, + String tableId, + String consumerIds, + Boolean clearUnspecified) + throws Catalog.TableNotExistException { + FileStoreTable fileStoreTable = + (FileStoreTable) catalog.getTable(Identifier.fromString(tableId)); + ConsumerManager consumerManager = + new ConsumerManager( + fileStoreTable.fileIO(), + fileStoreTable.location(), + fileStoreTable.snapshotManager().branch()); + if (consumerIds != null) { + List specifiedConsumerIds = + Optional.of(consumerIds) + .map(s -> Arrays.asList(s.split(","))) + .orElse(Collections.emptyList()); + consumerManager.clearConsumers( + specifiedConsumerIds, Optional.ofNullable(clearUnspecified).orElse(false)); + } else { + consumerManager.clearConsumers(null, null); + } + + return new String[] {"Success"}; + } + + @Override + public String identifier() { + return IDENTIFIER; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index db932bb38f9c..4ffb2073efcf 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -84,3 +84,4 @@ org.apache.paimon.flink.procedure.CloneProcedure org.apache.paimon.flink.procedure.CompactManifestProcedure org.apache.paimon.flink.procedure.RefreshObjectTableProcedure org.apache.paimon.flink.procedure.RemoveUnexistingFilesProcedure +org.apache.paimon.flink.procedure.ClearConsumersProcedure \ No newline at end of file diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java index 6fb8c81eb744..1a41d26dbd5d 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java @@ -312,4 +312,159 @@ public void testResetBranchConsumer(String invoker) throws Exception { Optional consumer3 = consumerManager.consumer("myid"); assertThat(consumer3).isNotPresent(); } + + @ParameterizedTest + @Timeout(120) + @ValueSource(strings = {"procedure_indexed", "procedure_named"}) + public void testClearConsumers(String invoker) throws Exception { + init(warehouse); + + RowType rowType = + RowType.of( + new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()}, + new String[] {"pk1", "col1"}); + FileStoreTable table = + createFileStoreTable( + rowType, + Collections.emptyList(), + Collections.singletonList("pk1"), + Collections.emptyList(), + Collections.emptyMap()); + + StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); + write = writeBuilder.newWrite(); + commit = writeBuilder.newCommit(); + + // 3 snapshots + writeData(rowData(1L, BinaryString.fromString("Hi"))); + writeData(rowData(2L, BinaryString.fromString("Hello"))); + writeData(rowData(3L, BinaryString.fromString("Paimon"))); + + // use consumer streaming read table + BlockingIterator iterator1 = + testStreamingRead( + "SELECT * FROM `" + + tableName + + "` /*+ OPTIONS('consumer-id'='myid1','consumer.expiration-time'='3h') */", + Arrays.asList( + changelogRow("+I", 1L, "Hi"), + changelogRow("+I", 2L, "Hello"), + changelogRow("+I", 3L, "Paimon"))); + + ConsumerManager consumerManager = new ConsumerManager(table.fileIO(), table.location()); + while (!consumerManager.consumer("myid1").isPresent()) { + Thread.sleep(1000); + } + iterator1.close(); + + // use consumer streaming read table + BlockingIterator iterator2 = + testStreamingRead( + "SELECT * FROM `" + + tableName + + "` /*+ OPTIONS('consumer-id'='myid2','consumer.expiration-time'='3h') */", + Arrays.asList( + changelogRow("+I", 1L, "Hi"), + changelogRow("+I", 2L, "Hello"), + changelogRow("+I", 3L, "Paimon"))); + + while (!consumerManager.consumer("myid2").isPresent()) { + Thread.sleep(1000); + } + iterator2.close(); + + // use consumer streaming read table + BlockingIterator iterator3 = + testStreamingRead( + "SELECT * FROM `" + + tableName + + "` /*+ OPTIONS('consumer-id'='myid3','consumer.expiration-time'='3h') */", + Arrays.asList( + changelogRow("+I", 1L, "Hi"), + changelogRow("+I", 2L, "Hello"), + changelogRow("+I", 3L, "Paimon"))); + + while (!consumerManager.consumer("myid3").isPresent()) { + Thread.sleep(1000); + } + iterator3.close(); + + Optional consumer1 = consumerManager.consumer("myid1"); + Optional consumer2 = consumerManager.consumer("myid2"); + Optional consumer3 = consumerManager.consumer("myid3"); + assertThat(consumer1).isPresent(); + assertThat(consumer2).isPresent(); + assertThat(consumer3).isPresent(); + + // clear all consumers except the specified consumer in the table + switch (invoker) { + case "procedure_indexed": + executeSQL( + String.format( + "CALL sys.clear_consumers('%s.%s', 'myid1,myid2', true)", + database, tableName)); + break; + case "procedure_named": + executeSQL( + String.format( + "CALL sys.clear_consumers(`table` => '%s.%s', consumer_ids => 'myid1,myid2', clear_unspecified => cast(true as BOOLEAN))", + database, tableName)); + break; + default: + throw new UnsupportedOperationException(invoker); + } + + Optional consumer4 = consumerManager.consumer("myid1"); + Optional consumer5 = consumerManager.consumer("myid2"); + Optional consumer6 = consumerManager.consumer("myid3"); + assertThat(consumer4).isPresent(); + assertThat(consumer5).isPresent(); + assertThat(consumer6).isNotPresent(); + + // clear all specified consumers in the table + switch (invoker) { + case "procedure_indexed": + executeSQL( + String.format( + "CALL sys.clear_consumers('%s.%s', 'myid1')", database, tableName)); + break; + case "procedure_named": + executeSQL( + String.format( + "CALL sys.clear_consumers(`table` => '%s.%s', consumer_ids => 'myid1')", + database, tableName)); + break; + default: + throw new UnsupportedOperationException(invoker); + } + + Optional consumer7 = consumerManager.consumer("myid1"); + Optional consumer8 = consumerManager.consumer("myid2"); + Optional consumer9 = consumerManager.consumer("myid3"); + assertThat(consumer7).isNotPresent(); + assertThat(consumer8).isPresent(); + assertThat(consumer9).isNotPresent(); + + // clear all consumers in the table + switch (invoker) { + case "procedure_indexed": + executeSQL(String.format("CALL sys.clear_consumers('%s.%s')", database, tableName)); + break; + case "procedure_named": + executeSQL( + String.format( + "CALL sys.clear_consumers(`table` => '%s.%s')", + database, tableName)); + break; + default: + throw new UnsupportedOperationException(invoker); + } + + Optional consumer10 = consumerManager.consumer("myid1"); + Optional consumer11 = consumerManager.consumer("myid2"); + Optional consumer12 = consumerManager.consumer("myid3"); + assertThat(consumer10).isNotPresent(); + assertThat(consumer11).isNotPresent(); + assertThat(consumer12).isNotPresent(); + } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java index 06f747f606c3..eff62cad96c4 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java @@ -18,6 +18,7 @@ package org.apache.paimon.spark; +import org.apache.paimon.spark.procedure.ClearConsumersProcedure; import org.apache.paimon.spark.procedure.CompactManifestProcedure; import org.apache.paimon.spark.procedure.CompactProcedure; import org.apache.paimon.spark.procedure.CreateBranchProcedure; @@ -100,6 +101,7 @@ private static Map> initProcedureBuilders() { procedureBuilders.put("mark_partition_done", MarkPartitionDoneProcedure::builder); procedureBuilders.put("compact_manifest", CompactManifestProcedure::builder); procedureBuilders.put("refresh_object_table", RefreshObjectTableProcedure::builder); + procedureBuilders.put("clear_consumers", ClearConsumersProcedure::builder); return procedureBuilders.build(); } } diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java new file mode 100644 index 000000000000..4948aea91bd1 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure; + +import org.apache.paimon.consumer.ConsumerManager; +import org.apache.paimon.table.FileStoreTable; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; + +import static org.apache.spark.sql.types.DataTypes.BooleanType; +import static org.apache.spark.sql.types.DataTypes.StringType; + +/** + * Clear consumers procedure. Usage: + * + *

+ *  -- clear all consumers except the specified consumer in the table
+ *  CALL sys.clear_consumers('tableId', 'consumerIds', true)
+ *
+ * -- clear all specified consumers in the table
+ *  CALL sys.clear_consumers('tableId', 'consumerIds') or CALL sys.clear_consumers('tableId', 'consumerIds', false)
+ *
+ *  -- clear all consumers in the table
+ *  CALL sys.clear_unspecified_consumers('tableId')
+ * 
+ */ +public class ClearConsumersProcedure extends BaseProcedure { + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] { + ProcedureParameter.required("table", StringType), + ProcedureParameter.optional("consumerIds", StringType), + ProcedureParameter.optional("clearUnspecified", BooleanType) + }; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("result", DataTypes.BooleanType, true, Metadata.empty()) + }); + + protected ClearConsumersProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); + String consumerIds = args.isNullAt(1) ? null : args.getString(1); + Boolean clearUnspecified = !args.isNullAt(2) && args.getBoolean(2); + return modifyPaimonTable( + tableIdent, + table -> { + FileStoreTable fileStoreTable = (FileStoreTable) table; + ConsumerManager consumerManager = + new ConsumerManager( + fileStoreTable.fileIO(), + fileStoreTable.location(), + fileStoreTable.snapshotManager().branch()); + if (consumerIds != null) { + List specifiedConsumerIds = + Optional.of(consumerIds) + .map(s -> Arrays.asList(s.split(","))) + .orElse(Collections.emptyList()); + consumerManager.clearConsumers(specifiedConsumerIds, clearUnspecified); + } else { + consumerManager.clearConsumers(null, null); + } + + InternalRow outputRow = newInternalRow(true); + return new InternalRow[] {outputRow}; + }); + } + + public static ProcedureBuilder builder() { + return new Builder() { + @Override + public ClearConsumersProcedure doBuild() { + return new ClearConsumersProcedure(tableCatalog()); + } + }; + } + + @Override + public String description() { + return "ClearConsumersProcedure"; + } +} From ffed21cb72bd0b4e3c51e074afb579101c93c6c4 Mon Sep 17 00:00:00 2001 From: "chenxinwei.bugfree" Date: Mon, 13 Jan 2025 16:36:59 +0800 Subject: [PATCH 2/8] Correct incorrect code comments Change-Id: I9e6abe5e52c5c35b263e0a709ceb7c00bf3f8a30 --- .../apache/paimon/flink/procedure/ClearConsumersProcedure.java | 2 +- .../apache/paimon/flink/procedure/ClearConsumersProcedure.java | 2 +- .../apache/paimon/spark/procedure/ClearConsumersProcedure.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java index 2d3e6b2b7ed4..b7b85391b89e 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java @@ -41,7 +41,7 @@ * CALL sys.clear_consumers('tableId', 'consumerIds') or CALL sys.clear_consumers('tableId', 'consumerIds', false) * * -- clear all consumers in the table - * CALL sys.clear_unspecified_consumers('tableId') + * CALL sys.clear_consumers('tableId') * */ public class ClearConsumersProcedure extends ProcedureBase { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java index a7869a237a0e..ffe54b3e6cbe 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java @@ -44,7 +44,7 @@ * CALL sys.clear_consumers('tableId', 'consumerIds') or CALL sys.clear_consumers('tableId', 'consumerIds', false) * * -- clear all consumers in the table - * CALL sys.clear_unspecified_consumers('tableId') + * CALL sys.clear_consumers('tableId') * */ public class ClearConsumersProcedure extends ProcedureBase { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java index 4948aea91bd1..bc5e152ea4a8 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java @@ -48,7 +48,7 @@ * CALL sys.clear_consumers('tableId', 'consumerIds') or CALL sys.clear_consumers('tableId', 'consumerIds', false) * * -- clear all consumers in the table - * CALL sys.clear_unspecified_consumers('tableId') + * CALL sys.clear_consumers('tableId') * */ public class ClearConsumersProcedure extends BaseProcedure { From 89a9e087d1963ff4e3e423fedb86f8d251b9f63b Mon Sep 17 00:00:00 2001 From: "chenxinwei.bugfree" Date: Sun, 19 Jan 2025 15:10:56 +0800 Subject: [PATCH 3/8] Fix comments Change-Id: I66d3d6a774bcaa20a7c473b053490bfe614a7c95 --- docs/content/flink/procedures.md | 25 ++++++ docs/content/spark/procedures.md | 19 ++++ .../paimon/consumer/ConsumerManager.java | 17 ++-- .../procedure/ClearConsumersProcedure.java | 53 ++++++------ .../flink/action/ClearConsumerAction.java | 72 ++++++++++++++++ .../action/ClearConsumerActionFactory.java | 72 ++++++++++++++++ .../procedure/ClearConsumersProcedure.java | 47 +++++----- .../org.apache.paimon.factories.Factory | 1 + .../flink/action/ConsumerActionITCase.java | 86 +++++++++++++------ .../procedure/ClearConsumersProcedure.java | 48 ++++++----- 10 files changed, 334 insertions(+), 106 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ClearConsumerAction.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ClearConsumerActionFactory.java diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md index a58d4713b7f0..d759e12bf8bd 100644 --- a/docs/content/flink/procedures.md +++ b/docs/content/flink/procedures.md @@ -388,6 +388,31 @@ All available procedures are listed below. CALL sys.reset_consumer(`table` => 'default.T', consumer_id => 'myid', next_snapshot_id => cast(10 as bigint)) + + clear_consumers + + -- Use named argument
+ CALL [catalog.]sys.clear_consumers(`table` => 'identifier', including_consumers => 'includingConsumers', excluding_consumers => 'excludingConsumers')

+ -- Use indexed argument
+ -- clear all consumers in the table + CALL [catalog.]sys.clear_consumers('identifier') + -- clear some consumers in the table (accept regular expression)
+ CALL [catalog.]sys.clear_consumers('tableId', 'includingConsumers')

+ -- exclude some consumers (accept regular expression)
+ CALL [catalog.]sys.clear_consumers('tableId', 'consumerIds', 'includingConsumers', 'excludingConsumers') + + + To reset or delete consumer. Arguments: +
  • identifier: the target table identifier. Cannot be empty.
  • +
  • includingConsumers: consumers to be cleared.
  • +
  • excludingConsumers: consumers which not to be cleared.
  • + + CALL sys.clear_consumers(`table` => 'default.T')

    + CALL sys.clear_consumers(`table` => 'default.T', including_consumers => 'myid.*')

    + CALL sys.reset_consumer(table => 'default.T', including_consumers => '', excluding_consumers => 'myid1.*')

    + CALL sys.reset_consumer(table => 'default.T', including_consumers => 'myid.*', excluding_consumers => 'myid1.*') + + rollback_to diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index 2978cd68e350..d140521c05e0 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -338,6 +338,25 @@ This section introduce all available spark procedures about paimon. -- delete consumer
    CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid') + + + clear_consumers + + To clear consumers. Arguments: +
  • identifier: the target table identifier. Cannot be empty.
  • +
  • includingConsumers: consumers to be cleared.
  • +
  • excludingConsumers: consumers which not to be cleared.
  • + + + -- clear all consumers in the table
    + CALL sys.clear_consumers(table => 'default.T')

    + -- clear some consumers in the table (accept regular expression)
    + CALL sys.reset_consumer(table => 'default.T', includingConsumers => 'myid.*')

    + -- clear all consumers except excludingConsumers in the table (accept regular expression)
    + CALL sys.reset_consumer(table => 'default.T', includingConsumers => '', excludingConsumers => 'myid1.*')

    + -- clear all consumers with includingConsumers and excludingConsumers (accept regular expression)
    + CALL sys.reset_consumer(table => 'default.T', includingConsumers => 'myid.*', excludingConsumers => 'myid1.*') + mark_partition_done diff --git a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java index 42270b254f5d..8caf386e63e8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java @@ -32,6 +32,7 @@ import java.util.Map; import java.util.Optional; import java.util.OptionalLong; +import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH; @@ -108,7 +109,7 @@ public void expire(LocalDateTime expireDateTime) { } /** Clear consumers. */ - public void clearConsumers(List consumerIds, Boolean clearUnspecified) { + public void clearConsumers(Pattern includingPattern, Pattern excludingPattern) { try { listVersionedFileStatus(fileIO, consumerDirectory(), CONSUMER_PREFIX) .forEach( @@ -117,10 +118,16 @@ public void clearConsumers(List consumerIds, Boolean clearUnspecified) { status.getPath() .getName() .substring(CONSUMER_PREFIX.length()); - if (consumerIds == null - || (!clearUnspecified && consumerIds.contains(consumerName)) - || (clearUnspecified - && !consumerIds.contains(consumerName))) { + boolean shouldCompaction = + includingPattern.matcher(consumerName).matches(); + if (excludingPattern != null) { + shouldCompaction = + shouldCompaction + && !excludingPattern + .matcher(consumerName) + .matches(); + } + if (shouldCompaction) { fileIO.deleteQuietly(status.getPath()); } }); diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java index b7b85391b89e..1af693187d9e 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java @@ -25,23 +25,22 @@ import org.apache.flink.table.procedure.ProcedureContext; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Optional; +import java.util.regex.Pattern; /** * Clear consumers procedure. Usage: * *
    
    - *  -- clear all consumers except the specified consumer in the table
    - *  CALL sys.clear_consumers('tableId', 'consumerIds', true)
    - *
    - * -- clear all specified consumers in the table
    - *  CALL sys.clear_consumers('tableId', 'consumerIds') or CALL sys.clear_consumers('tableId', 'consumerIds', false)
    + *  -- NOTE: use '' as placeholder for optional arguments
      *
      *  -- clear all consumers in the table
      *  CALL sys.clear_consumers('tableId')
    + *
    + * -- clear some consumers in the table (accept regular expression)
    + *  CALL sys.clear_consumers('tableId', 'includingConsumers')
    + *
    + * -- exclude some consumers (accept regular expression)
    + *  CALL sys.clear_consumers('tableId', 'consumerIds', 'includingConsumers', 'excludingConsumers')
      * 
    */ public class ClearConsumersProcedure extends ProcedureBase { @@ -51,8 +50,8 @@ public class ClearConsumersProcedure extends ProcedureBase { public String[] call( ProcedureContext procedureContext, String tableId, - String consumerIds, - Boolean clearUnspecified) + String includingConsumers, + String excludingConsumers) throws Catalog.TableNotExistException { FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(Identifier.fromString(tableId)); @@ -61,33 +60,29 @@ public String[] call( fileStoreTable.fileIO(), fileStoreTable.location(), fileStoreTable.snapshotManager().branch()); - List specifiedConsumerIds = - Optional.of(consumerIds) - .map(s -> Arrays.asList(s.split(","))) - .orElse(Collections.emptyList()); - consumerManager.clearConsumers( - specifiedConsumerIds, Optional.of(clearUnspecified).orElse(false)); + + includingConsumers = nullable(includingConsumers); + excludingConsumers = nullable(excludingConsumers); + Pattern includingPattern = + includingConsumers == null + ? Pattern.compile(".*") + : Pattern.compile(includingConsumers); + Pattern excludingPattern = + excludingConsumers == null ? null : Pattern.compile(excludingConsumers); + consumerManager.clearConsumers(includingPattern, excludingPattern); return new String[] {"Success"}; } - public String[] call(ProcedureContext procedureContext, String tableId, String consumerIds) + public String[] call( + ProcedureContext procedureContext, String tableId, String includingConsumers) throws Catalog.TableNotExistException { - return call(procedureContext, tableId, consumerIds, false); + return call(procedureContext, tableId, includingConsumers, null); } public String[] call(ProcedureContext procedureContext, String tableId) throws Catalog.TableNotExistException { - FileStoreTable fileStoreTable = - (FileStoreTable) catalog.getTable(Identifier.fromString(tableId)); - ConsumerManager consumerManager = - new ConsumerManager( - fileStoreTable.fileIO(), - fileStoreTable.location(), - fileStoreTable.snapshotManager().branch()); - consumerManager.clearConsumers(null, null); - - return new String[] {"Success"}; + return call(procedureContext, tableId, null, null); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ClearConsumerAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ClearConsumerAction.java new file mode 100644 index 000000000000..67ca15f5553f --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ClearConsumerAction.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.consumer.ConsumerManager; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.StringUtils; + +import javax.annotation.Nullable; + +import java.util.Map; +import java.util.regex.Pattern; + +/** Clear consumers action for Flink. */ +public class ClearConsumerAction extends TableActionBase { + + private String includingConsumers; + private String excludingConsumers; + + protected ClearConsumerAction( + String databaseName, String tableName, Map catalogConfig) { + super(databaseName, tableName, catalogConfig); + } + + public ClearConsumerAction withIncludingConsumers(@Nullable String includingConsumers) { + this.includingConsumers = includingConsumers; + return this; + } + + public ClearConsumerAction withExcludingConsumers(@Nullable String excludingConsumers) { + this.excludingConsumers = excludingConsumers; + return this; + } + + @Override + public void run() throws Exception { + FileStoreTable dataTable = (FileStoreTable) table; + ConsumerManager consumerManager = + new ConsumerManager( + dataTable.fileIO(), + dataTable.location(), + dataTable.snapshotManager().branch()); + + includingConsumers = + StringUtils.isNullOrWhitespaceOnly(includingConsumers) ? null : includingConsumers; + excludingConsumers = + StringUtils.isNullOrWhitespaceOnly(excludingConsumers) ? null : excludingConsumers; + Pattern includingPattern = + includingConsumers == null + ? Pattern.compile(".*") + : Pattern.compile(includingConsumers); + Pattern excludingPattern = + excludingConsumers == null ? null : Pattern.compile(excludingConsumers); + consumerManager.clearConsumers(includingPattern, excludingPattern); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ClearConsumerActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ClearConsumerActionFactory.java new file mode 100644 index 000000000000..f027c30be184 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ClearConsumerActionFactory.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import java.util.Optional; + +/** Factory to create {@link ClearConsumerAction}. */ +public class ClearConsumerActionFactory implements ActionFactory { + + public static final String IDENTIFIER = "clear_consumers"; + + private static final String INCLUDING_CONSUMERS = "including_consumers"; + private static final String EXCLUDING_CONSUMERS = "excluding_consumers"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Optional create(MultipleParameterToolAdapter params) { + ClearConsumerAction action = + new ClearConsumerAction( + params.getRequired(DATABASE), + params.getRequired(TABLE), + catalogConfigMap(params)); + + if (params.has(INCLUDING_CONSUMERS)) { + action.withIncludingConsumers(params.get(INCLUDING_CONSUMERS)); + } + + if (params.has(EXCLUDING_CONSUMERS)) { + action.withExcludingConsumers(params.get(EXCLUDING_CONSUMERS)); + } + + return Optional.of(action); + } + + @Override + public void printHelp() { + System.out.println( + "Action \"clear_consumers\" clear consumers with including consumers and excluding consumers."); + System.out.println(); + + System.out.println("Syntax:"); + System.out.println( + " clear_consumers --warehouse --database " + + "--table [--including_consumers --excluding_consumers ]"); + + System.out.println(); + System.out.println("Note:"); + System.out.println( + " use '' as placeholder for including_consumers if you want to clear all consumers except excludingConsumers in the table."); + System.out.println(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java index ffe54b3e6cbe..01debb2808f2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java @@ -28,23 +28,22 @@ import org.apache.flink.table.annotation.ProcedureHint; import org.apache.flink.table.procedure.ProcedureContext; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Optional; +import java.util.regex.Pattern; /** * Clear consumers procedure. Usage: * *
    
    - *  -- clear all consumers except the specified consumer in the table
    - *  CALL sys.clear_consumers('tableId', 'consumerIds', true)
    - *
    - * -- clear all specified consumers in the table
    - *  CALL sys.clear_consumers('tableId', 'consumerIds') or CALL sys.clear_consumers('tableId', 'consumerIds', false)
    + *  -- NOTE: use '' as placeholder for optional arguments
      *
      *  -- clear all consumers in the table
      *  CALL sys.clear_consumers('tableId')
    + *
    + * -- clear some consumers in the table (accept regular expression)
    + *  CALL sys.clear_consumers('tableId', 'includingConsumers')
    + *
    + * -- exclude some consumers (accept regular expression)
    + *  CALL sys.clear_consumers('tableId', 'consumerIds', 'includingConsumers', 'excludingConsumers')
      * 
    */ public class ClearConsumersProcedure extends ProcedureBase { @@ -55,19 +54,19 @@ public class ClearConsumersProcedure extends ProcedureBase { argument = { @ArgumentHint(name = "table", type = @DataTypeHint("STRING")), @ArgumentHint( - name = "consumer_ids", + name = "including_consumers", type = @DataTypeHint("STRING"), isOptional = true), @ArgumentHint( - name = "clear_unspecified", - type = @DataTypeHint("BOOLEAN"), + name = "excluding_consumers", + type = @DataTypeHint("STRING"), isOptional = true) }) public String[] call( ProcedureContext procedureContext, String tableId, - String consumerIds, - Boolean clearUnspecified) + String includingConsumers, + String excludingConsumers) throws Catalog.TableNotExistException { FileStoreTable fileStoreTable = (FileStoreTable) catalog.getTable(Identifier.fromString(tableId)); @@ -76,16 +75,16 @@ public String[] call( fileStoreTable.fileIO(), fileStoreTable.location(), fileStoreTable.snapshotManager().branch()); - if (consumerIds != null) { - List specifiedConsumerIds = - Optional.of(consumerIds) - .map(s -> Arrays.asList(s.split(","))) - .orElse(Collections.emptyList()); - consumerManager.clearConsumers( - specifiedConsumerIds, Optional.ofNullable(clearUnspecified).orElse(false)); - } else { - consumerManager.clearConsumers(null, null); - } + + includingConsumers = nullable(includingConsumers); + excludingConsumers = nullable(excludingConsumers); + Pattern includingPattern = + includingConsumers == null + ? Pattern.compile(".*") + : Pattern.compile(includingConsumers); + Pattern excludingPattern = + excludingConsumers == null ? null : Pattern.compile(excludingConsumers); + consumerManager.clearConsumers(includingPattern, excludingPattern); return new String[] {"Success"}; } diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 4ffb2073efcf..6f6becf85fc7 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -44,6 +44,7 @@ org.apache.paimon.flink.action.RepairActionFactory org.apache.paimon.flink.action.RewriteFileIndexActionFactory org.apache.paimon.flink.action.ExpireSnapshotsActionFactory org.apache.paimon.flink.action.RemoveUnexistingFilesActionFactory +org.apache.paimon.flink.action.ClearConsumerActionFactory ### procedure factories org.apache.paimon.flink.procedure.CompactDatabaseProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java index 1a41d26dbd5d..b24e964f0cff 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java @@ -315,7 +315,7 @@ public void testResetBranchConsumer(String invoker) throws Exception { @ParameterizedTest @Timeout(120) - @ValueSource(strings = {"procedure_indexed", "procedure_named"}) + @ValueSource(strings = {"action", "procedure_indexed", "procedure_named"}) public void testClearConsumers(String invoker) throws Exception { init(warehouse); @@ -345,14 +345,14 @@ public void testClearConsumers(String invoker) throws Exception { testStreamingRead( "SELECT * FROM `" + tableName - + "` /*+ OPTIONS('consumer-id'='myid1','consumer.expiration-time'='3h') */", + + "` /*+ OPTIONS('consumer-id'='myid1_1','consumer.expiration-time'='3h') */", Arrays.asList( changelogRow("+I", 1L, "Hi"), changelogRow("+I", 2L, "Hello"), changelogRow("+I", 3L, "Paimon"))); ConsumerManager consumerManager = new ConsumerManager(table.fileIO(), table.location()); - while (!consumerManager.consumer("myid1").isPresent()) { + while (!consumerManager.consumer("myid1_1").isPresent()) { Thread.sleep(1000); } iterator1.close(); @@ -362,13 +362,13 @@ public void testClearConsumers(String invoker) throws Exception { testStreamingRead( "SELECT * FROM `" + tableName - + "` /*+ OPTIONS('consumer-id'='myid2','consumer.expiration-time'='3h') */", + + "` /*+ OPTIONS('consumer-id'='myid1_2','consumer.expiration-time'='3h') */", Arrays.asList( changelogRow("+I", 1L, "Hi"), changelogRow("+I", 2L, "Hello"), changelogRow("+I", 3L, "Paimon"))); - while (!consumerManager.consumer("myid2").isPresent()) { + while (!consumerManager.consumer("myid1_2").isPresent()) { Thread.sleep(1000); } iterator2.close(); @@ -378,75 +378,111 @@ public void testClearConsumers(String invoker) throws Exception { testStreamingRead( "SELECT * FROM `" + tableName - + "` /*+ OPTIONS('consumer-id'='myid3','consumer.expiration-time'='3h') */", + + "` /*+ OPTIONS('consumer-id'='myid2','consumer.expiration-time'='3h') */", Arrays.asList( changelogRow("+I", 1L, "Hi"), changelogRow("+I", 2L, "Hello"), changelogRow("+I", 3L, "Paimon"))); - while (!consumerManager.consumer("myid3").isPresent()) { + while (!consumerManager.consumer("myid2").isPresent()) { Thread.sleep(1000); } iterator3.close(); - Optional consumer1 = consumerManager.consumer("myid1"); - Optional consumer2 = consumerManager.consumer("myid2"); - Optional consumer3 = consumerManager.consumer("myid3"); + Optional consumer1 = consumerManager.consumer("myid1_1"); + Optional consumer2 = consumerManager.consumer("myid1_2"); + Optional consumer3 = consumerManager.consumer("myid2"); assertThat(consumer1).isPresent(); assertThat(consumer2).isPresent(); assertThat(consumer3).isPresent(); - // clear all consumers except the specified consumer in the table + List args = + Arrays.asList( + "clear_consumers", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--including_consumers", + "", + "--excluding_consumers", + "myid1.+"); + + // clear all consumers except the excluding_consumers in the table switch (invoker) { + case "action": + createAction(ClearConsumerAction.class, args).run(); + break; case "procedure_indexed": executeSQL( String.format( - "CALL sys.clear_consumers('%s.%s', 'myid1,myid2', true)", + "CALL sys.clear_consumers('%s.%s', '', 'myid1.+')", database, tableName)); break; case "procedure_named": executeSQL( String.format( - "CALL sys.clear_consumers(`table` => '%s.%s', consumer_ids => 'myid1,myid2', clear_unspecified => cast(true as BOOLEAN))", + "CALL sys.clear_consumers(`table` => '%s.%s', including_consumers => '', excluding_consumers => 'myid1.+')", database, tableName)); break; default: throw new UnsupportedOperationException(invoker); } - Optional consumer4 = consumerManager.consumer("myid1"); - Optional consumer5 = consumerManager.consumer("myid2"); - Optional consumer6 = consumerManager.consumer("myid3"); + Optional consumer4 = consumerManager.consumer("myid1_1"); + Optional consumer5 = consumerManager.consumer("myid1_2"); + Optional consumer6 = consumerManager.consumer("myid2"); assertThat(consumer4).isPresent(); assertThat(consumer5).isPresent(); assertThat(consumer6).isNotPresent(); - // clear all specified consumers in the table + args = + Arrays.asList( + "clear_consumers", + "--warehouse", + warehouse, + "--database", + database, + "--table", + tableName, + "--including_consumers", + "myid1_1"); + + // clear consumers in the table that meet the including_consumers expression switch (invoker) { + case "action": + createAction(ClearConsumerAction.class, args).run(); + break; case "procedure_indexed": executeSQL( String.format( - "CALL sys.clear_consumers('%s.%s', 'myid1')", database, tableName)); + "CALL sys.clear_consumers('%s.%s', 'myid1_1')", + database, tableName)); break; case "procedure_named": executeSQL( String.format( - "CALL sys.clear_consumers(`table` => '%s.%s', consumer_ids => 'myid1')", + "CALL sys.clear_consumers(`table` => '%s.%s', including_consumers => 'myid1_1')", database, tableName)); break; default: throw new UnsupportedOperationException(invoker); } - Optional consumer7 = consumerManager.consumer("myid1"); - Optional consumer8 = consumerManager.consumer("myid2"); - Optional consumer9 = consumerManager.consumer("myid3"); + Optional consumer7 = consumerManager.consumer("myid1_1"); + Optional consumer8 = consumerManager.consumer("myid1_2"); + Optional consumer9 = consumerManager.consumer("myid2"); assertThat(consumer7).isNotPresent(); assertThat(consumer8).isPresent(); assertThat(consumer9).isNotPresent(); // clear all consumers in the table switch (invoker) { + case "action": + createAction(ClearConsumerAction.class, args.subList(0, 7)).run(); + break; case "procedure_indexed": executeSQL(String.format("CALL sys.clear_consumers('%s.%s')", database, tableName)); break; @@ -460,9 +496,9 @@ public void testClearConsumers(String invoker) throws Exception { throw new UnsupportedOperationException(invoker); } - Optional consumer10 = consumerManager.consumer("myid1"); - Optional consumer11 = consumerManager.consumer("myid2"); - Optional consumer12 = consumerManager.consumer("myid3"); + Optional consumer10 = consumerManager.consumer("myid1_1"); + Optional consumer11 = consumerManager.consumer("myid1_2"); + Optional consumer12 = consumerManager.consumer("myid2"); assertThat(consumer10).isNotPresent(); assertThat(consumer11).isNotPresent(); assertThat(consumer12).isNotPresent(); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java index bc5e152ea4a8..73065add014a 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java @@ -20,6 +20,7 @@ import org.apache.paimon.consumer.ConsumerManager; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.StringUtils; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.catalog.Identifier; @@ -29,26 +30,24 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Optional; +import java.util.regex.Pattern; -import static org.apache.spark.sql.types.DataTypes.BooleanType; import static org.apache.spark.sql.types.DataTypes.StringType; /** * Clear consumers procedure. Usage: * *
    
    - *  -- clear all consumers except the specified consumer in the table
    - *  CALL sys.clear_consumers('tableId', 'consumerIds', true)
    - *
    - * -- clear all specified consumers in the table
    - *  CALL sys.clear_consumers('tableId', 'consumerIds') or CALL sys.clear_consumers('tableId', 'consumerIds', false)
    + *  -- NOTE: use '' as placeholder for optional arguments
      *
      *  -- clear all consumers in the table
      *  CALL sys.clear_consumers('tableId')
    + *
    + * -- clear some consumers in the table (accept regular expression)
    + *  CALL sys.clear_consumers('tableId', 'includingConsumers')
    + *
    + * -- exclude some consumers (accept regular expression)
    + *  CALL sys.clear_consumers('tableId', 'consumerIds', 'includingConsumers', 'excludingConsumers')
      * 
    */ public class ClearConsumersProcedure extends BaseProcedure { @@ -56,8 +55,8 @@ public class ClearConsumersProcedure extends BaseProcedure { private static final ProcedureParameter[] PARAMETERS = new ProcedureParameter[] { ProcedureParameter.required("table", StringType), - ProcedureParameter.optional("consumerIds", StringType), - ProcedureParameter.optional("clearUnspecified", BooleanType) + ProcedureParameter.optional("includingConsumers", StringType), + ProcedureParameter.optional("excludingConsumers", StringType) }; private static final StructType OUTPUT_TYPE = @@ -83,8 +82,19 @@ public StructType outputType() { @Override public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); - String consumerIds = args.isNullAt(1) ? null : args.getString(1); - Boolean clearUnspecified = !args.isNullAt(2) && args.getBoolean(2); + String includingConsumers = args.isNullAt(1) ? null : args.getString(1); + includingConsumers = + StringUtils.isNullOrWhitespaceOnly(includingConsumers) ? null : includingConsumers; + String excludingConsumers = args.isNullAt(2) ? null : args.getString(2); + excludingConsumers = + StringUtils.isNullOrWhitespaceOnly(excludingConsumers) ? null : excludingConsumers; + Pattern includingPattern = + includingConsumers == null + ? Pattern.compile(".*") + : Pattern.compile(includingConsumers); + Pattern excludingPattern = + excludingConsumers == null ? null : Pattern.compile(excludingConsumers); + return modifyPaimonTable( tableIdent, table -> { @@ -94,15 +104,7 @@ public InternalRow[] call(InternalRow args) { fileStoreTable.fileIO(), fileStoreTable.location(), fileStoreTable.snapshotManager().branch()); - if (consumerIds != null) { - List specifiedConsumerIds = - Optional.of(consumerIds) - .map(s -> Arrays.asList(s.split(","))) - .orElse(Collections.emptyList()); - consumerManager.clearConsumers(specifiedConsumerIds, clearUnspecified); - } else { - consumerManager.clearConsumers(null, null); - } + consumerManager.clearConsumers(includingPattern, excludingPattern); InternalRow outputRow = newInternalRow(true); return new InternalRow[] {outputRow}; From 985da36f0856bab052b2d7470edc4b2264b31216 Mon Sep 17 00:00:00 2001 From: "chenxinwei.bugfree" Date: Mon, 20 Jan 2025 20:44:26 +0800 Subject: [PATCH 4/8] Fix comments Change-Id: Id3912c1c2c90a7beaf41fba1d9c35f162ad6d605 --- docs/content/flink/procedures.md | 2 +- .../apache/paimon/flink/procedure/ClearConsumersProcedure.java | 2 +- .../apache/paimon/flink/procedure/ClearConsumersProcedure.java | 2 +- .../apache/paimon/spark/procedure/ClearConsumersProcedure.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md index d759e12bf8bd..3a302f501f7a 100644 --- a/docs/content/flink/procedures.md +++ b/docs/content/flink/procedures.md @@ -399,7 +399,7 @@ All available procedures are listed below. -- clear some consumers in the table (accept regular expression)
    CALL [catalog.]sys.clear_consumers('tableId', 'includingConsumers')

    -- exclude some consumers (accept regular expression)
    - CALL [catalog.]sys.clear_consumers('tableId', 'consumerIds', 'includingConsumers', 'excludingConsumers') + CALL [catalog.]sys.clear_consumers('tableId', 'includingConsumers', 'excludingConsumers') To reset or delete consumer. Arguments: diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java index 1af693187d9e..27190c870022 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java @@ -40,7 +40,7 @@ * CALL sys.clear_consumers('tableId', 'includingConsumers') * * -- exclude some consumers (accept regular expression) - * CALL sys.clear_consumers('tableId', 'consumerIds', 'includingConsumers', 'excludingConsumers') + * CALL sys.clear_consumers('tableId', 'includingConsumers', 'excludingConsumers') * */ public class ClearConsumersProcedure extends ProcedureBase { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java index 01debb2808f2..d1f664eef017 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java @@ -43,7 +43,7 @@ * CALL sys.clear_consumers('tableId', 'includingConsumers') * * -- exclude some consumers (accept regular expression) - * CALL sys.clear_consumers('tableId', 'consumerIds', 'includingConsumers', 'excludingConsumers') + * CALL sys.clear_consumers('tableId', 'includingConsumers', 'excludingConsumers') * */ public class ClearConsumersProcedure extends ProcedureBase { diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java index 73065add014a..0627ce58d3f1 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java @@ -47,7 +47,7 @@ * CALL sys.clear_consumers('tableId', 'includingConsumers') * * -- exclude some consumers (accept regular expression) - * CALL sys.clear_consumers('tableId', 'consumerIds', 'includingConsumers', 'excludingConsumers') + * CALL sys.clear_consumers('tableId', 'includingConsumers', 'excludingConsumers') * */ public class ClearConsumersProcedure extends BaseProcedure { From 9bb47a9277ddc7952064853c2e6bb6af369f9b32 Mon Sep 17 00:00:00 2001 From: "chenxinwei.bugfree" Date: Mon, 20 Jan 2025 20:50:36 +0800 Subject: [PATCH 5/8] Fix comment Change-Id: I2b94e9f7519b36cbd4f2eed8c82a84809ad8643b --- docs/content/flink/procedures.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md index 3a302f501f7a..2a6439e374b2 100644 --- a/docs/content/flink/procedures.md +++ b/docs/content/flink/procedures.md @@ -397,9 +397,9 @@ All available procedures are listed below. -- clear all consumers in the table CALL [catalog.]sys.clear_consumers('identifier') -- clear some consumers in the table (accept regular expression)
    - CALL [catalog.]sys.clear_consumers('tableId', 'includingConsumers')

    + CALL [catalog.]sys.clear_consumers('identifier', 'includingConsumers')

    -- exclude some consumers (accept regular expression)
    - CALL [catalog.]sys.clear_consumers('tableId', 'includingConsumers', 'excludingConsumers') + CALL [catalog.]sys.clear_consumers('identifier', 'includingConsumers', 'excludingConsumers') To reset or delete consumer. Arguments: From 92da5749c85760a8566d1bfce1a4bf4f1023df1d Mon Sep 17 00:00:00 2001 From: "chenxinwei.bugfree" Date: Fri, 24 Jan 2025 12:33:07 +0800 Subject: [PATCH 6/8] Fix comments Change-Id: I2bbdfb3e81396b0eae0bc7b33449ae46cb8a93ef --- .../apache/paimon/consumer/ConsumerManager.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java index 8caf386e63e8..4762238b6ff4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java @@ -113,22 +113,23 @@ public void clearConsumers(Pattern includingPattern, Pattern excludingPattern) { try { listVersionedFileStatus(fileIO, consumerDirectory(), CONSUMER_PREFIX) .forEach( - status -> { + fileStatus -> { String consumerName = - status.getPath() + fileStatus + .getPath() .getName() .substring(CONSUMER_PREFIX.length()); - boolean shouldCompaction = + boolean shouldClear = includingPattern.matcher(consumerName).matches(); if (excludingPattern != null) { - shouldCompaction = - shouldCompaction + shouldClear = + shouldClear && !excludingPattern .matcher(consumerName) .matches(); } - if (shouldCompaction) { - fileIO.deleteQuietly(status.getPath()); + if (shouldClear) { + fileIO.deleteQuietly(fileStatus.getPath()); } }); } catch (IOException e) { From 257bb7ab409be7b44ff2f9fa33572df2f09c877e Mon Sep 17 00:00:00 2001 From: "chenxinwei.bugfree" Date: Wed, 5 Feb 2025 20:27:25 +0800 Subject: [PATCH 7/8] Fix comments Change-Id: I0064aa87a79b7e477d64e8b49fc2f58d04717359 --- .../flink/procedure/ClearConsumersProcedure.java | 9 +++++---- .../paimon/flink/action/ClearConsumerAction.java | 10 ++++------ .../flink/procedure/ClearConsumersProcedure.java | 9 +++++---- .../spark/procedure/ClearConsumersProcedure.java | 10 ++++------ 4 files changed, 18 insertions(+), 20 deletions(-) diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java index 27190c870022..a3f6713f4cdb 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java @@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.consumer.ConsumerManager; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.StringUtils; import org.apache.flink.table.procedure.ProcedureContext; @@ -61,14 +62,14 @@ public String[] call( fileStoreTable.location(), fileStoreTable.snapshotManager().branch()); - includingConsumers = nullable(includingConsumers); - excludingConsumers = nullable(excludingConsumers); Pattern includingPattern = - includingConsumers == null + StringUtils.isNullOrWhitespaceOnly(includingConsumers) ? Pattern.compile(".*") : Pattern.compile(includingConsumers); Pattern excludingPattern = - excludingConsumers == null ? null : Pattern.compile(excludingConsumers); + StringUtils.isNullOrWhitespaceOnly(excludingConsumers) + ? null + : Pattern.compile(excludingConsumers); consumerManager.clearConsumers(includingPattern, excludingPattern); return new String[] {"Success"}; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ClearConsumerAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ClearConsumerAction.java index 67ca15f5553f..37708ecd9330 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ClearConsumerAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ClearConsumerAction.java @@ -57,16 +57,14 @@ public void run() throws Exception { dataTable.location(), dataTable.snapshotManager().branch()); - includingConsumers = - StringUtils.isNullOrWhitespaceOnly(includingConsumers) ? null : includingConsumers; - excludingConsumers = - StringUtils.isNullOrWhitespaceOnly(excludingConsumers) ? null : excludingConsumers; Pattern includingPattern = - includingConsumers == null + StringUtils.isNullOrWhitespaceOnly(includingConsumers) ? Pattern.compile(".*") : Pattern.compile(includingConsumers); Pattern excludingPattern = - excludingConsumers == null ? null : Pattern.compile(excludingConsumers); + StringUtils.isNullOrWhitespaceOnly(excludingConsumers) + ? null + : Pattern.compile(excludingConsumers); consumerManager.clearConsumers(includingPattern, excludingPattern); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java index d1f664eef017..de4c371d30a3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java @@ -22,6 +22,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.consumer.ConsumerManager; import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.StringUtils; import org.apache.flink.table.annotation.ArgumentHint; import org.apache.flink.table.annotation.DataTypeHint; @@ -76,14 +77,14 @@ public String[] call( fileStoreTable.location(), fileStoreTable.snapshotManager().branch()); - includingConsumers = nullable(includingConsumers); - excludingConsumers = nullable(excludingConsumers); Pattern includingPattern = - includingConsumers == null + StringUtils.isNullOrWhitespaceOnly(includingConsumers) ? Pattern.compile(".*") : Pattern.compile(includingConsumers); Pattern excludingPattern = - excludingConsumers == null ? null : Pattern.compile(excludingConsumers); + StringUtils.isNullOrWhitespaceOnly(excludingConsumers) + ? null + : Pattern.compile(excludingConsumers); consumerManager.clearConsumers(includingPattern, excludingPattern); return new String[] {"Success"}; diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java index 0627ce58d3f1..cdde1c6f8350 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java @@ -83,17 +83,15 @@ public StructType outputType() { public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); String includingConsumers = args.isNullAt(1) ? null : args.getString(1); - includingConsumers = - StringUtils.isNullOrWhitespaceOnly(includingConsumers) ? null : includingConsumers; String excludingConsumers = args.isNullAt(2) ? null : args.getString(2); - excludingConsumers = - StringUtils.isNullOrWhitespaceOnly(excludingConsumers) ? null : excludingConsumers; Pattern includingPattern = - includingConsumers == null + StringUtils.isNullOrWhitespaceOnly(includingConsumers) ? Pattern.compile(".*") : Pattern.compile(includingConsumers); Pattern excludingPattern = - excludingConsumers == null ? null : Pattern.compile(excludingConsumers); + StringUtils.isNullOrWhitespaceOnly(excludingConsumers) + ? null + : Pattern.compile(excludingConsumers); return modifyPaimonTable( tableIdent, From 247d1a7b04d3b14d074e67ad87234bce90ddeb4b Mon Sep 17 00:00:00 2001 From: "chenxinwei.bugfree" Date: Thu, 6 Feb 2025 12:08:46 +0800 Subject: [PATCH 8/8] Fix bugs Change-Id: Iab746c53db5c0bdaf4b2cacd80ea67cd99342f5a --- docs/content/flink/procedures.md | 4 ++-- docs/content/spark/procedures.md | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md index 2a6439e374b2..8189aeaa96d0 100644 --- a/docs/content/flink/procedures.md +++ b/docs/content/flink/procedures.md @@ -409,8 +409,8 @@ All available procedures are listed below. CALL sys.clear_consumers(`table` => 'default.T')

    CALL sys.clear_consumers(`table` => 'default.T', including_consumers => 'myid.*')

    - CALL sys.reset_consumer(table => 'default.T', including_consumers => '', excluding_consumers => 'myid1.*')

    - CALL sys.reset_consumer(table => 'default.T', including_consumers => 'myid.*', excluding_consumers => 'myid1.*') + CALL sys.clear_consumers(table => 'default.T', including_consumers => '', excluding_consumers => 'myid1.*')

    + CALL sys.clear_consumers(table => 'default.T', including_consumers => 'myid.*', excluding_consumers => 'myid1.*') diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index d140521c05e0..66802ac28e60 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -351,11 +351,11 @@ This section introduce all available spark procedures about paimon. -- clear all consumers in the table
    CALL sys.clear_consumers(table => 'default.T')

    -- clear some consumers in the table (accept regular expression)
    - CALL sys.reset_consumer(table => 'default.T', includingConsumers => 'myid.*')

    + CALL sys.clear_consumers(table => 'default.T', includingConsumers => 'myid.*')

    -- clear all consumers except excludingConsumers in the table (accept regular expression)
    - CALL sys.reset_consumer(table => 'default.T', includingConsumers => '', excludingConsumers => 'myid1.*')

    + CALL sys.clear_consumers(table => 'default.T', includingConsumers => '', excludingConsumers => 'myid1.*')

    -- clear all consumers with includingConsumers and excludingConsumers (accept regular expression)
    - CALL sys.reset_consumer(table => 'default.T', includingConsumers => 'myid.*', excludingConsumers => 'myid1.*') + CALL sys.clear_consumers(table => 'default.T', includingConsumers => 'myid.*', excludingConsumers => 'myid1.*')