diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index a58d4713b7f0..8189aeaa96d0 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -388,6 +388,31 @@ All available procedures are listed below.
CALL sys.reset_consumer(`table` => 'default.T', consumer_id => 'myid', next_snapshot_id => cast(10 as bigint)) |
+
+ | clear_consumers |
+
+ -- Use named argument
+ CALL [catalog.]sys.clear_consumers(`table` => 'identifier', including_consumers => 'includingConsumers', excluding_consumers => 'excludingConsumers')
+ -- Use indexed argument
+ -- clear all consumers in the table
+ CALL [catalog.]sys.clear_consumers('identifier')
+ -- clear some consumers in the table (accept regular expression)
+ CALL [catalog.]sys.clear_consumers('identifier', 'includingConsumers')
+ -- exclude some consumers (accept regular expression)
+ CALL [catalog.]sys.clear_consumers('identifier', 'includingConsumers', 'excludingConsumers')
+ |
+
+ To reset or delete consumer. Arguments:
+ identifier: the target table identifier. Cannot be empty.
+ includingConsumers: consumers to be cleared.
+ excludingConsumers: consumers which not to be cleared.
+ |
+ CALL sys.clear_consumers(`table` => 'default.T')
+ CALL sys.clear_consumers(`table` => 'default.T', including_consumers => 'myid.*')
+ CALL sys.clear_consumers(table => 'default.T', including_consumers => '', excluding_consumers => 'myid1.*')
+ CALL sys.clear_consumers(table => 'default.T', including_consumers => 'myid.*', excluding_consumers => 'myid1.*')
+ |
+
| rollback_to |
diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md
index 2978cd68e350..66802ac28e60 100644
--- a/docs/content/spark/procedures.md
+++ b/docs/content/spark/procedures.md
@@ -338,6 +338,25 @@ This section introduce all available spark procedures about paimon.
-- delete consumer
CALL sys.reset_consumer(table => 'default.T', consumerId => 'myid')
|
+
+
+ | clear_consumers |
+
+ To clear consumers. Arguments:
+ identifier: the target table identifier. Cannot be empty.
+ includingConsumers: consumers to be cleared.
+ excludingConsumers: consumers which not to be cleared.
+ |
+
+ -- clear all consumers in the table
+ CALL sys.clear_consumers(table => 'default.T')
+ -- clear some consumers in the table (accept regular expression)
+ CALL sys.clear_consumers(table => 'default.T', includingConsumers => 'myid.*')
+ -- clear all consumers except excludingConsumers in the table (accept regular expression)
+ CALL sys.clear_consumers(table => 'default.T', includingConsumers => '', excludingConsumers => 'myid1.*')
+ -- clear all consumers with includingConsumers and excludingConsumers (accept regular expression)
+ CALL sys.clear_consumers(table => 'default.T', includingConsumers => 'myid.*', excludingConsumers => 'myid1.*')
+ |
| mark_partition_done |
diff --git a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
index 839143b68f93..4762238b6ff4 100644
--- a/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/consumer/ConsumerManager.java
@@ -32,6 +32,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
@@ -107,6 +108,35 @@ public void expire(LocalDateTime expireDateTime) {
}
}
+ /** Clear consumers. */
+ public void clearConsumers(Pattern includingPattern, Pattern excludingPattern) {
+ try {
+ listVersionedFileStatus(fileIO, consumerDirectory(), CONSUMER_PREFIX)
+ .forEach(
+ fileStatus -> {
+ String consumerName =
+ fileStatus
+ .getPath()
+ .getName()
+ .substring(CONSUMER_PREFIX.length());
+ boolean shouldClear =
+ includingPattern.matcher(consumerName).matches();
+ if (excludingPattern != null) {
+ shouldClear =
+ shouldClear
+ && !excludingPattern
+ .matcher(consumerName)
+ .matches();
+ }
+ if (shouldClear) {
+ fileIO.deleteQuietly(fileStatus.getPath());
+ }
+ });
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/** Get all consumer. */
public Map consumers() throws IOException {
Map consumers = new HashMap<>();
diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java
new file mode 100644
index 000000000000..a3f6713f4cdb
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.consumer.ConsumerManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.regex.Pattern;
+
+/**
+ * Clear consumers procedure. Usage:
+ *
+ *
+ * -- NOTE: use '' as placeholder for optional arguments
+ *
+ * -- clear all consumers in the table
+ * CALL sys.clear_consumers('tableId')
+ *
+ * -- clear some consumers in the table (accept regular expression)
+ * CALL sys.clear_consumers('tableId', 'includingConsumers')
+ *
+ * -- exclude some consumers (accept regular expression)
+ * CALL sys.clear_consumers('tableId', 'includingConsumers', 'excludingConsumers')
+ *
+ */
+public class ClearConsumersProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "clear_consumers";
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ String includingConsumers,
+ String excludingConsumers)
+ throws Catalog.TableNotExistException {
+ FileStoreTable fileStoreTable =
+ (FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
+ ConsumerManager consumerManager =
+ new ConsumerManager(
+ fileStoreTable.fileIO(),
+ fileStoreTable.location(),
+ fileStoreTable.snapshotManager().branch());
+
+ Pattern includingPattern =
+ StringUtils.isNullOrWhitespaceOnly(includingConsumers)
+ ? Pattern.compile(".*")
+ : Pattern.compile(includingConsumers);
+ Pattern excludingPattern =
+ StringUtils.isNullOrWhitespaceOnly(excludingConsumers)
+ ? null
+ : Pattern.compile(excludingConsumers);
+ consumerManager.clearConsumers(includingPattern, excludingPattern);
+
+ return new String[] {"Success"};
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext, String tableId, String includingConsumers)
+ throws Catalog.TableNotExistException {
+ return call(procedureContext, tableId, includingConsumers, null);
+ }
+
+ public String[] call(ProcedureContext procedureContext, String tableId)
+ throws Catalog.TableNotExistException {
+ return call(procedureContext, tableId, null, null);
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ClearConsumerAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ClearConsumerAction.java
new file mode 100644
index 000000000000..37708ecd9330
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ClearConsumerAction.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.consumer.ConsumerManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+
+/** Clear consumers action for Flink. */
+public class ClearConsumerAction extends TableActionBase {
+
+ private String includingConsumers;
+ private String excludingConsumers;
+
+ protected ClearConsumerAction(
+ String databaseName, String tableName, Map catalogConfig) {
+ super(databaseName, tableName, catalogConfig);
+ }
+
+ public ClearConsumerAction withIncludingConsumers(@Nullable String includingConsumers) {
+ this.includingConsumers = includingConsumers;
+ return this;
+ }
+
+ public ClearConsumerAction withExcludingConsumers(@Nullable String excludingConsumers) {
+ this.excludingConsumers = excludingConsumers;
+ return this;
+ }
+
+ @Override
+ public void run() throws Exception {
+ FileStoreTable dataTable = (FileStoreTable) table;
+ ConsumerManager consumerManager =
+ new ConsumerManager(
+ dataTable.fileIO(),
+ dataTable.location(),
+ dataTable.snapshotManager().branch());
+
+ Pattern includingPattern =
+ StringUtils.isNullOrWhitespaceOnly(includingConsumers)
+ ? Pattern.compile(".*")
+ : Pattern.compile(includingConsumers);
+ Pattern excludingPattern =
+ StringUtils.isNullOrWhitespaceOnly(excludingConsumers)
+ ? null
+ : Pattern.compile(excludingConsumers);
+ consumerManager.clearConsumers(includingPattern, excludingPattern);
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ClearConsumerActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ClearConsumerActionFactory.java
new file mode 100644
index 000000000000..f027c30be184
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ClearConsumerActionFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Optional;
+
+/** Factory to create {@link ClearConsumerAction}. */
+public class ClearConsumerActionFactory implements ActionFactory {
+
+ public static final String IDENTIFIER = "clear_consumers";
+
+ private static final String INCLUDING_CONSUMERS = "including_consumers";
+ private static final String EXCLUDING_CONSUMERS = "excluding_consumers";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Optional create(MultipleParameterToolAdapter params) {
+ ClearConsumerAction action =
+ new ClearConsumerAction(
+ params.getRequired(DATABASE),
+ params.getRequired(TABLE),
+ catalogConfigMap(params));
+
+ if (params.has(INCLUDING_CONSUMERS)) {
+ action.withIncludingConsumers(params.get(INCLUDING_CONSUMERS));
+ }
+
+ if (params.has(EXCLUDING_CONSUMERS)) {
+ action.withExcludingConsumers(params.get(EXCLUDING_CONSUMERS));
+ }
+
+ return Optional.of(action);
+ }
+
+ @Override
+ public void printHelp() {
+ System.out.println(
+ "Action \"clear_consumers\" clear consumers with including consumers and excluding consumers.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " clear_consumers --warehouse --database "
+ + "--table [--including_consumers --excluding_consumers ]");
+
+ System.out.println();
+ System.out.println("Note:");
+ System.out.println(
+ " use '' as placeholder for including_consumers if you want to clear all consumers except excludingConsumers in the table.");
+ System.out.println();
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java
new file mode 100644
index 000000000000..de4c371d30a3
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ClearConsumersProcedure.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.consumer.ConsumerManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+import java.util.regex.Pattern;
+
+/**
+ * Clear consumers procedure. Usage:
+ *
+ *
+ * -- NOTE: use '' as placeholder for optional arguments
+ *
+ * -- clear all consumers in the table
+ * CALL sys.clear_consumers('tableId')
+ *
+ * -- clear some consumers in the table (accept regular expression)
+ * CALL sys.clear_consumers('tableId', 'includingConsumers')
+ *
+ * -- exclude some consumers (accept regular expression)
+ * CALL sys.clear_consumers('tableId', 'includingConsumers', 'excludingConsumers')
+ *
+ */
+public class ClearConsumersProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "clear_consumers";
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(
+ name = "including_consumers",
+ type = @DataTypeHint("STRING"),
+ isOptional = true),
+ @ArgumentHint(
+ name = "excluding_consumers",
+ type = @DataTypeHint("STRING"),
+ isOptional = true)
+ })
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ String includingConsumers,
+ String excludingConsumers)
+ throws Catalog.TableNotExistException {
+ FileStoreTable fileStoreTable =
+ (FileStoreTable) catalog.getTable(Identifier.fromString(tableId));
+ ConsumerManager consumerManager =
+ new ConsumerManager(
+ fileStoreTable.fileIO(),
+ fileStoreTable.location(),
+ fileStoreTable.snapshotManager().branch());
+
+ Pattern includingPattern =
+ StringUtils.isNullOrWhitespaceOnly(includingConsumers)
+ ? Pattern.compile(".*")
+ : Pattern.compile(includingConsumers);
+ Pattern excludingPattern =
+ StringUtils.isNullOrWhitespaceOnly(excludingConsumers)
+ ? null
+ : Pattern.compile(excludingConsumers);
+ consumerManager.clearConsumers(includingPattern, excludingPattern);
+
+ return new String[] {"Success"};
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index db932bb38f9c..6f6becf85fc7 100644
--- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -44,6 +44,7 @@ org.apache.paimon.flink.action.RepairActionFactory
org.apache.paimon.flink.action.RewriteFileIndexActionFactory
org.apache.paimon.flink.action.ExpireSnapshotsActionFactory
org.apache.paimon.flink.action.RemoveUnexistingFilesActionFactory
+org.apache.paimon.flink.action.ClearConsumerActionFactory
### procedure factories
org.apache.paimon.flink.procedure.CompactDatabaseProcedure
@@ -84,3 +85,4 @@ org.apache.paimon.flink.procedure.CloneProcedure
org.apache.paimon.flink.procedure.CompactManifestProcedure
org.apache.paimon.flink.procedure.RefreshObjectTableProcedure
org.apache.paimon.flink.procedure.RemoveUnexistingFilesProcedure
+org.apache.paimon.flink.procedure.ClearConsumersProcedure
\ No newline at end of file
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
index 6fb8c81eb744..b24e964f0cff 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ConsumerActionITCase.java
@@ -312,4 +312,195 @@ public void testResetBranchConsumer(String invoker) throws Exception {
Optional consumer3 = consumerManager.consumer("myid");
assertThat(consumer3).isNotPresent();
}
+
+ @ParameterizedTest
+ @Timeout(120)
+ @ValueSource(strings = {"action", "procedure_indexed", "procedure_named"})
+ public void testClearConsumers(String invoker) throws Exception {
+ init(warehouse);
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()},
+ new String[] {"pk1", "col1"});
+ FileStoreTable table =
+ createFileStoreTable(
+ rowType,
+ Collections.emptyList(),
+ Collections.singletonList("pk1"),
+ Collections.emptyList(),
+ Collections.emptyMap());
+
+ StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser);
+ write = writeBuilder.newWrite();
+ commit = writeBuilder.newCommit();
+
+ // 3 snapshots
+ writeData(rowData(1L, BinaryString.fromString("Hi")));
+ writeData(rowData(2L, BinaryString.fromString("Hello")));
+ writeData(rowData(3L, BinaryString.fromString("Paimon")));
+
+ // use consumer streaming read table
+ BlockingIterator iterator1 =
+ testStreamingRead(
+ "SELECT * FROM `"
+ + tableName
+ + "` /*+ OPTIONS('consumer-id'='myid1_1','consumer.expiration-time'='3h') */",
+ Arrays.asList(
+ changelogRow("+I", 1L, "Hi"),
+ changelogRow("+I", 2L, "Hello"),
+ changelogRow("+I", 3L, "Paimon")));
+
+ ConsumerManager consumerManager = new ConsumerManager(table.fileIO(), table.location());
+ while (!consumerManager.consumer("myid1_1").isPresent()) {
+ Thread.sleep(1000);
+ }
+ iterator1.close();
+
+ // use consumer streaming read table
+ BlockingIterator iterator2 =
+ testStreamingRead(
+ "SELECT * FROM `"
+ + tableName
+ + "` /*+ OPTIONS('consumer-id'='myid1_2','consumer.expiration-time'='3h') */",
+ Arrays.asList(
+ changelogRow("+I", 1L, "Hi"),
+ changelogRow("+I", 2L, "Hello"),
+ changelogRow("+I", 3L, "Paimon")));
+
+ while (!consumerManager.consumer("myid1_2").isPresent()) {
+ Thread.sleep(1000);
+ }
+ iterator2.close();
+
+ // use consumer streaming read table
+ BlockingIterator iterator3 =
+ testStreamingRead(
+ "SELECT * FROM `"
+ + tableName
+ + "` /*+ OPTIONS('consumer-id'='myid2','consumer.expiration-time'='3h') */",
+ Arrays.asList(
+ changelogRow("+I", 1L, "Hi"),
+ changelogRow("+I", 2L, "Hello"),
+ changelogRow("+I", 3L, "Paimon")));
+
+ while (!consumerManager.consumer("myid2").isPresent()) {
+ Thread.sleep(1000);
+ }
+ iterator3.close();
+
+ Optional consumer1 = consumerManager.consumer("myid1_1");
+ Optional consumer2 = consumerManager.consumer("myid1_2");
+ Optional consumer3 = consumerManager.consumer("myid2");
+ assertThat(consumer1).isPresent();
+ assertThat(consumer2).isPresent();
+ assertThat(consumer3).isPresent();
+
+ List args =
+ Arrays.asList(
+ "clear_consumers",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--including_consumers",
+ "",
+ "--excluding_consumers",
+ "myid1.+");
+
+ // clear all consumers except the excluding_consumers in the table
+ switch (invoker) {
+ case "action":
+ createAction(ClearConsumerAction.class, args).run();
+ break;
+ case "procedure_indexed":
+ executeSQL(
+ String.format(
+ "CALL sys.clear_consumers('%s.%s', '', 'myid1.+')",
+ database, tableName));
+ break;
+ case "procedure_named":
+ executeSQL(
+ String.format(
+ "CALL sys.clear_consumers(`table` => '%s.%s', including_consumers => '', excluding_consumers => 'myid1.+')",
+ database, tableName));
+ break;
+ default:
+ throw new UnsupportedOperationException(invoker);
+ }
+
+ Optional consumer4 = consumerManager.consumer("myid1_1");
+ Optional consumer5 = consumerManager.consumer("myid1_2");
+ Optional consumer6 = consumerManager.consumer("myid2");
+ assertThat(consumer4).isPresent();
+ assertThat(consumer5).isPresent();
+ assertThat(consumer6).isNotPresent();
+
+ args =
+ Arrays.asList(
+ "clear_consumers",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--including_consumers",
+ "myid1_1");
+
+ // clear consumers in the table that meet the including_consumers expression
+ switch (invoker) {
+ case "action":
+ createAction(ClearConsumerAction.class, args).run();
+ break;
+ case "procedure_indexed":
+ executeSQL(
+ String.format(
+ "CALL sys.clear_consumers('%s.%s', 'myid1_1')",
+ database, tableName));
+ break;
+ case "procedure_named":
+ executeSQL(
+ String.format(
+ "CALL sys.clear_consumers(`table` => '%s.%s', including_consumers => 'myid1_1')",
+ database, tableName));
+ break;
+ default:
+ throw new UnsupportedOperationException(invoker);
+ }
+
+ Optional consumer7 = consumerManager.consumer("myid1_1");
+ Optional consumer8 = consumerManager.consumer("myid1_2");
+ Optional consumer9 = consumerManager.consumer("myid2");
+ assertThat(consumer7).isNotPresent();
+ assertThat(consumer8).isPresent();
+ assertThat(consumer9).isNotPresent();
+
+ // clear all consumers in the table
+ switch (invoker) {
+ case "action":
+ createAction(ClearConsumerAction.class, args.subList(0, 7)).run();
+ break;
+ case "procedure_indexed":
+ executeSQL(String.format("CALL sys.clear_consumers('%s.%s')", database, tableName));
+ break;
+ case "procedure_named":
+ executeSQL(
+ String.format(
+ "CALL sys.clear_consumers(`table` => '%s.%s')",
+ database, tableName));
+ break;
+ default:
+ throw new UnsupportedOperationException(invoker);
+ }
+
+ Optional consumer10 = consumerManager.consumer("myid1_1");
+ Optional consumer11 = consumerManager.consumer("myid1_2");
+ Optional consumer12 = consumerManager.consumer("myid2");
+ assertThat(consumer10).isNotPresent();
+ assertThat(consumer11).isNotPresent();
+ assertThat(consumer12).isNotPresent();
+ }
}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index 06f747f606c3..eff62cad96c4 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -18,6 +18,7 @@
package org.apache.paimon.spark;
+import org.apache.paimon.spark.procedure.ClearConsumersProcedure;
import org.apache.paimon.spark.procedure.CompactManifestProcedure;
import org.apache.paimon.spark.procedure.CompactProcedure;
import org.apache.paimon.spark.procedure.CreateBranchProcedure;
@@ -100,6 +101,7 @@ private static Map> initProcedureBuilders() {
procedureBuilders.put("mark_partition_done", MarkPartitionDoneProcedure::builder);
procedureBuilders.put("compact_manifest", CompactManifestProcedure::builder);
procedureBuilders.put("refresh_object_table", RefreshObjectTableProcedure::builder);
+ procedureBuilders.put("clear_consumers", ClearConsumersProcedure::builder);
return procedureBuilders.build();
}
}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java
new file mode 100644
index 000000000000..cdde1c6f8350
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ClearConsumersProcedure.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.consumer.ConsumerManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+import java.util.regex.Pattern;
+
+import static org.apache.spark.sql.types.DataTypes.StringType;
+
+/**
+ * Clear consumers procedure. Usage:
+ *
+ *
+ * -- NOTE: use '' as placeholder for optional arguments
+ *
+ * -- clear all consumers in the table
+ * CALL sys.clear_consumers('tableId')
+ *
+ * -- clear some consumers in the table (accept regular expression)
+ * CALL sys.clear_consumers('tableId', 'includingConsumers')
+ *
+ * -- exclude some consumers (accept regular expression)
+ * CALL sys.clear_consumers('tableId', 'includingConsumers', 'excludingConsumers')
+ *
+ */
+public class ClearConsumersProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {
+ ProcedureParameter.required("table", StringType),
+ ProcedureParameter.optional("includingConsumers", StringType),
+ ProcedureParameter.optional("excludingConsumers", StringType)
+ };
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("result", DataTypes.BooleanType, true, Metadata.empty())
+ });
+
+ protected ClearConsumersProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
+ String includingConsumers = args.isNullAt(1) ? null : args.getString(1);
+ String excludingConsumers = args.isNullAt(2) ? null : args.getString(2);
+ Pattern includingPattern =
+ StringUtils.isNullOrWhitespaceOnly(includingConsumers)
+ ? Pattern.compile(".*")
+ : Pattern.compile(includingConsumers);
+ Pattern excludingPattern =
+ StringUtils.isNullOrWhitespaceOnly(excludingConsumers)
+ ? null
+ : Pattern.compile(excludingConsumers);
+
+ return modifyPaimonTable(
+ tableIdent,
+ table -> {
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ ConsumerManager consumerManager =
+ new ConsumerManager(
+ fileStoreTable.fileIO(),
+ fileStoreTable.location(),
+ fileStoreTable.snapshotManager().branch());
+ consumerManager.clearConsumers(includingPattern, excludingPattern);
+
+ InternalRow outputRow = newInternalRow(true);
+ return new InternalRow[] {outputRow};
+ });
+ }
+
+ public static ProcedureBuilder builder() {
+ return new Builder() {
+ @Override
+ public ClearConsumersProcedure doBuild() {
+ return new ClearConsumersProcedure(tableCatalog());
+ }
+ };
+ }
+
+ @Override
+ public String description() {
+ return "ClearConsumersProcedure";
+ }
+}