diff --git a/docs/content/flink/sql-write.md b/docs/content/flink/sql-write.md
index 6abbfa01756c..b5950c669988 100644
--- a/docs/content/flink/sql-write.md
+++ b/docs/content/flink/sql-write.md
@@ -261,8 +261,26 @@ CREATE TABLE my_partitioned_table (
'partition.time-interval'='1 d',
'partition.idle-time-to-done'='15 m',
'partition.mark-done-action'='done-partition'
+ -- You can also customize a PartitionMarkDoneAction to mark the partition completed.
+ -- 'partition.mark-done-action'='done-partition,custom',
+ -- 'partition.mark-done-action.custom.class'='org.apache.paimon.CustomPartitionMarkDoneAction'
);
```
+Define a class CustomPartitionMarkDoneAction to implement the PartitionMarkDoneAction interface.
+```java
+package org.apache.paimon;
+
+public class CustomPartitionMarkDoneAction implements PartitionMarkDoneAction {
+
+ @Override
+ public void markDone(String partition) {
+ // do something.
+ }
+
+ @Override
+ public void close() {}
+}
+```
1. Firstly, you need to define the time parser of the partition and the time interval between partitions in order to
determine when the partition can be properly marked done.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index f60d0ec91009..6d1fa744b414 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -639,7 +639,13 @@
partition.mark-done-action |
"success-file" |
String |
- Action to mark a partition done is to notify the downstream application that the partition has finished writing, the partition is ready to be read. 1. 'success-file': add '_success' file to directory. 2. 'done-partition': add 'xxx.done' partition to metastore. 3. 'mark-event': mark partition event to metastore. Both can be configured at the same time: 'done-partition,success-file,mark-event'. |
+ Action to mark a partition done is to notify the downstream application that the partition has finished writing, the partition is ready to be read. 1. 'success-file': add '_success' file to directory. 2. 'done-partition': add 'xxx.done' partition to metastore. 3. 'mark-event': mark partition event to metastore. 4. 'custom': use policy class to create a mark-partition policy. Both can be configured at the same time: 'done-partition,success-file,mark-event,custom'. |
+
+
+ partition.mark-done-action.custom.class |
+ (none) |
+ String |
+ The partition mark done class for implement PartitionMarkDoneAction interface. Only work in custom mark-done-action. |
partition.timestamp-formatter |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index efd886501266..a8dfb7b99cd8 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1185,9 +1185,20 @@ public class CoreOptions implements Serializable {
.text("3. 'mark-event': mark partition event to metastore.")
.linebreak()
.text(
- "Both can be configured at the same time: 'done-partition,success-file,mark-event'.")
+ "4. 'custom': use policy class to create a mark-partition policy.")
+ .linebreak()
+ .text(
+ "Both can be configured at the same time: 'done-partition,success-file,mark-event,custom'.")
.build());
+ public static final ConfigOption PARTITION_MARK_DONE_CUSTOM_CLASS =
+ key("partition.mark-done-action.custom.class")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "The partition mark done class for implement"
+ + " PartitionMarkDoneAction interface. Only work in custom mark-done-action.");
+
public static final ConfigOption METASTORE_PARTITIONED_TABLE =
key("metastore.partitioned-table")
.booleanType()
@@ -2189,6 +2200,10 @@ public String partitionTimestampPattern() {
return options.get(PARTITION_TIMESTAMP_PATTERN);
}
+ public String partitionMarkDoneCustomClass() {
+ return options.get(PARTITION_MARK_DONE_CUSTOM_CLASS);
+ }
+
public String consumerId() {
String consumerId = options.get(CONSUMER_ID);
if (consumerId != null && consumerId.isEmpty()) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
index d6b13a25e270..4bdb49823d52 100644
--- a/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
+++ b/paimon-core/src/main/java/org/apache/paimon/partition/actions/PartitionMarkDoneAction.java
@@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.StringUtils;
import java.io.Closeable;
import java.util.Arrays;
@@ -29,29 +30,37 @@
import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
+import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_CUSTOM_CLASS;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
/** Action to mark partitions done. */
public interface PartitionMarkDoneAction extends Closeable {
+ String SUCCESS_FILE = "success-file";
+ String DONE_PARTITION = "done-partition";
+ String MARK_EVENT = "mark-event";
+ String CUSTOM = "custom";
+
void markDone(String partition) throws Exception;
static List createActions(
- FileStoreTable fileStoreTable, CoreOptions options) {
+ ClassLoader cl, FileStoreTable fileStoreTable, CoreOptions options) {
return Arrays.stream(options.toConfiguration().get(PARTITION_MARK_DONE_ACTION).split(","))
.map(
action -> {
- switch (action) {
- case "success-file":
+ switch (action.toLowerCase()) {
+ case SUCCESS_FILE:
return new SuccessFileMarkDoneAction(
fileStoreTable.fileIO(), fileStoreTable.location());
- case "done-partition":
+ case DONE_PARTITION:
return new AddDonePartitionAction(
createMetastoreClient(fileStoreTable, options));
- case "mark-event":
+ case MARK_EVENT:
return new MarkPartitionDoneEventAction(
createMetastoreClient(fileStoreTable, options));
+ case CUSTOM:
+ return generateCustomMarkDoneAction(cl, options);
default:
throw new UnsupportedOperationException(action);
}
@@ -59,6 +68,25 @@ static List createActions(
.collect(Collectors.toList());
}
+ static PartitionMarkDoneAction generateCustomMarkDoneAction(
+ ClassLoader cl, CoreOptions options) {
+ if (StringUtils.isNullOrWhitespaceOnly(options.partitionMarkDoneCustomClass())) {
+ throw new IllegalArgumentException(
+ String.format(
+ "You need to set [%s] when you add [%s] mark done action in your property [%s].",
+ PARTITION_MARK_DONE_CUSTOM_CLASS.key(),
+ CUSTOM,
+ PARTITION_MARK_DONE_ACTION.key()));
+ }
+ String customClass = options.partitionMarkDoneCustomClass();
+ try {
+ return (PartitionMarkDoneAction) cl.loadClass(customClass).newInstance();
+ } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
+ throw new RuntimeException(
+ "Can not create new instance for custom class from " + customClass, e);
+ }
+ }
+
static MetastoreClient createMetastoreClient(FileStoreTable table, CoreOptions options) {
MetastoreClient.Factory metastoreClientFactory =
table.catalogEnvironment().metastoreClientFactory();
diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
index d70cccf6ba25..22abfb3f3b40 100644
--- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
+++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
@@ -64,7 +64,8 @@ public String[] call(
FileStoreTable fileStoreTable = (FileStoreTable) table;
CoreOptions coreOptions = fileStoreTable.coreOptions();
List actions =
- PartitionMarkDoneAction.createActions(fileStoreTable, coreOptions);
+ PartitionMarkDoneAction.createActions(
+ getClass().getClassLoader(), fileStoreTable, coreOptions);
List partitionPaths =
PartitionPathUtils.generatePartitionPaths(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
index 25cd14af2195..c566af0a1949 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MarkPartitionDoneAction.java
@@ -54,7 +54,8 @@ public MarkPartitionDoneAction(
@Override
public void run() throws Exception {
List actions =
- PartitionMarkDoneAction.createActions(fileStoreTable, fileStoreTable.coreOptions());
+ PartitionMarkDoneAction.createActions(
+ getClass().getClassLoader(), fileStoreTable, fileStoreTable.coreOptions());
List partitionPaths =
PartitionPathUtils.generatePartitionPaths(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
index f0a89a0bb32b..d73553045b18 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MarkPartitionDoneProcedure.java
@@ -72,7 +72,8 @@ public String[] call(ProcedureContext procedureContext, String tableId, String p
FileStoreTable fileStoreTable = (FileStoreTable) table;
CoreOptions coreOptions = fileStoreTable.coreOptions();
List actions =
- PartitionMarkDoneAction.createActions(fileStoreTable, coreOptions);
+ PartitionMarkDoneAction.createActions(
+ procedureContext.getClass().getClassLoader(), fileStoreTable, coreOptions);
List partitionPaths =
PartitionPathUtils.generatePartitionPaths(
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
index d190b9ccf39e..cbf14da456d0 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionListeners.java
@@ -64,6 +64,7 @@ public static PartitionListeners create(Committer.Context context, FileStoreTabl
// partition mark done
PartitionMarkDone.create(
+ context.getClass().getClassLoader(),
context.streamingCheckpointEnabled(),
context.isRestored(),
context.stateStore(),
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
index 8714e0006e7b..6f360c782348 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/partition/PartitionMarkDone.java
@@ -52,6 +52,7 @@ public class PartitionMarkDone implements PartitionListener {
private final boolean waitCompaction;
public static Optional create(
+ ClassLoader cl,
boolean isStreaming,
boolean isRestored,
OperatorStateStore stateStore,
@@ -75,7 +76,7 @@ public static Optional create(
PartitionMarkDoneTrigger.create(coreOptions, isRestored, stateStore);
List actions =
- PartitionMarkDoneAction.createActions(table, coreOptions);
+ PartitionMarkDoneAction.createActions(cl, table, coreOptions);
// if batch read skip level 0 files, we should wait compaction to mark done
// otherwise, some data may not be readable, and there might be data delays
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
index 3c1b73df8cb3..63edb8bb4c98 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MarkPartitionDoneActionITCase.java
@@ -20,7 +20,9 @@
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.flink.sink.partition.MockCustomPartitionMarkDoneAction;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
import org.apache.paimon.partition.file.SuccessFile;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamWriteBuilder;
@@ -36,8 +38,11 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Map;
import java.util.stream.Stream;
+import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
+import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_CUSTOM_CLASS;
import static org.assertj.core.api.Assertions.assertThat;
/** IT cases for {@link MarkPartitionDoneAction}. */
@@ -103,7 +108,7 @@ public void testPartitionMarkDoneWithSinglePartitionKey(boolean hasPk, String in
@ParameterizedTest
@MethodSource("testArguments")
- public void testDropPartitionWithMultiplePartitionKey(boolean hasPk, String invoker)
+ public void testPartitionMarkDoneWithMultiplePartitionKey(boolean hasPk, String invoker)
throws Exception {
FileStoreTable table = prepareTable(hasPk);
@@ -149,7 +154,72 @@ public void testDropPartitionWithMultiplePartitionKey(boolean hasPk, String invo
assertThat(successFile2).isNotNull();
}
+ @ParameterizedTest
+ @MethodSource("testArguments")
+ public void testCustomPartitionMarkDoneAction(boolean hasPk, String invoker) throws Exception {
+
+ Map options = new HashMap<>();
+ options.put(
+ PARTITION_MARK_DONE_ACTION.key(),
+ PartitionMarkDoneAction.SUCCESS_FILE + "," + PartitionMarkDoneAction.CUSTOM);
+ options.put(
+ PARTITION_MARK_DONE_CUSTOM_CLASS.key(),
+ MockCustomPartitionMarkDoneAction.class.getName());
+
+ FileStoreTable table = prepareTable(hasPk, options);
+
+ switch (invoker) {
+ case "action":
+ createAction(
+ MarkPartitionDoneAction.class,
+ "mark_partition_done",
+ "--warehouse",
+ warehouse,
+ "--database",
+ database,
+ "--table",
+ tableName,
+ "--partition",
+ "partKey0=0,partKey1=1",
+ "--partition",
+ "partKey0=1,partKey1=0")
+ .run();
+ break;
+ case "procedure_indexed":
+ executeSQL(
+ String.format(
+ "CALL sys.mark_partition_done('%s.%s', 'partKey0=0,partKey1=1;partKey0=1,partKey1=0')",
+ database, tableName));
+ break;
+ case "procedure_named":
+ executeSQL(
+ String.format(
+ "CALL sys.mark_partition_done(`table` => '%s.%s', partitions => 'partKey0=0,partKey1=1;partKey0=1,partKey1=0')",
+ database, tableName));
+ break;
+ default:
+ throw new UnsupportedOperationException(invoker);
+ }
+
+ Path successPath1 = new Path(table.location(), "partKey0=0/partKey1=1/_SUCCESS");
+ SuccessFile successFile1 = SuccessFile.safelyFromPath(table.fileIO(), successPath1);
+ assertThat(successFile1).isNotNull();
+
+ Path successPath2 = new Path(table.location(), "partKey0=1/partKey1=0/_SUCCESS");
+ SuccessFile successFile2 = SuccessFile.safelyFromPath(table.fileIO(), successPath2);
+ assertThat(successFile2).isNotNull();
+
+ assertThat(MockCustomPartitionMarkDoneAction.getMarkedDonePartitions())
+ .containsExactlyInAnyOrder("partKey0=0/partKey1=1/", "partKey0=1/partKey1=0/");
+ }
+
private FileStoreTable prepareTable(boolean hasPk) throws Exception {
+ return prepareTable(hasPk, Collections.emptyMap());
+ }
+
+ private FileStoreTable prepareTable(boolean hasPk, Map options)
+ throws Exception {
+
FileStoreTable table =
createFileStoreTable(
ROW_TYPE,
@@ -158,7 +228,7 @@ private FileStoreTable prepareTable(boolean hasPk) throws Exception {
? Arrays.asList("partKey0", "partKey1", "dt")
: Collections.emptyList(),
hasPk ? Collections.emptyList() : Collections.singletonList("dt"),
- new HashMap<>());
+ options);
SnapshotManager snapshotManager = table.snapshotManager();
StreamWriteBuilder streamWriteBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
new file mode 100644
index 000000000000..d1599b6c57ea
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/CustomPartitionMarkDoneActionTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.partition;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.types.DataTypes;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
+import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_CUSTOM_CLASS;
+import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
+import static org.apache.paimon.flink.sink.partition.PartitionMarkDoneTest.notifyCommits;
+import static org.apache.paimon.partition.actions.PartitionMarkDoneAction.CUSTOM;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for custom PartitionMarkDoneAction. */
+public class CustomPartitionMarkDoneActionTest extends TableTestBase {
+
+ @Test
+ public void testCustomPartitionMarkDoneAction() throws Exception {
+
+ Identifier identifier = identifier("T");
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.INT())
+ .column("c", DataTypes.INT())
+ .partitionKeys("a")
+ .primaryKey("a", "b")
+ .option(PARTITION_MARK_DONE_WHEN_END_INPUT.key(), "true")
+ .option(PARTITION_MARK_DONE_ACTION.key(), "success-file,custom")
+ .build();
+ catalog.createTable(identifier, schema, true);
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+ Path location = table.location();
+ Path successFile = new Path(location, "a=0/_SUCCESS");
+
+ // Throwing the exception, if the parameter 'partition.mark-done-action.custom.class' is not
+ // set.
+ Assertions.assertThatThrownBy(
+ () ->
+ PartitionMarkDone.create(
+ getClass().getClassLoader(),
+ false,
+ false,
+ new PartitionMarkDoneTest.MockOperatorStateStore(),
+ table))
+ .hasMessageContaining(
+ String.format(
+ "You need to set [%s] when you add [%s] mark done action in your property [%s].",
+ PARTITION_MARK_DONE_CUSTOM_CLASS.key(),
+ CUSTOM,
+ PARTITION_MARK_DONE_ACTION.key()));
+
+ // Set parameter 'partition.mark-done-action.custom.class'.
+ catalog.alterTable(
+ identifier,
+ SchemaChange.setOption(
+ PARTITION_MARK_DONE_CUSTOM_CLASS.key(),
+ MockCustomPartitionMarkDoneAction.class.getName()),
+ true);
+
+ FileStoreTable table2 = (FileStoreTable) catalog.getTable(identifier);
+
+ PartitionMarkDone markDone =
+ PartitionMarkDone.create(
+ getClass().getClassLoader(),
+ false,
+ false,
+ new PartitionMarkDoneTest.MockOperatorStateStore(),
+ table2)
+ .get();
+
+ notifyCommits(markDone, false);
+
+ assertThat(table2.fileIO().exists(successFile)).isEqualTo(true);
+
+ assertThat(MockCustomPartitionMarkDoneAction.getMarkedDonePartitions().iterator().next())
+ .isEqualTo("a=0/");
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java
new file mode 100644
index 000000000000..f8d9b4034672
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/MockCustomPartitionMarkDoneAction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink.partition;
+
+import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+/** The class is only applicable for {@link CustomPartitionMarkDoneActionTest}. */
+public class MockCustomPartitionMarkDoneAction implements PartitionMarkDoneAction {
+
+ private static final Set markedDonePartitions = new HashSet<>();
+
+ @Override
+ public void markDone(String partition) {
+ MockCustomPartitionMarkDoneAction.markedDonePartitions.add(partition);
+ }
+
+ public static Set getMarkedDonePartitions() {
+ return MockCustomPartitionMarkDoneAction.markedDonePartitions;
+ }
+
+ @Override
+ public void close() throws IOException {}
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
index 9e5fe7ff9ff7..f737a19fa903 100644
--- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/partition/PartitionMarkDoneTest.java
@@ -86,7 +86,13 @@ private void innerTest(boolean deletionVectors) throws Exception {
Path location = table.location();
Path successFile = new Path(location, "a=0/_SUCCESS");
PartitionMarkDone markDone =
- PartitionMarkDone.create(false, false, new MockOperatorStateStore(), table).get();
+ PartitionMarkDone.create(
+ getClass().getClassLoader(),
+ false,
+ false,
+ new MockOperatorStateStore(),
+ table)
+ .get();
notifyCommits(markDone, true);
assertThat(table.fileIO().exists(successFile)).isEqualTo(deletionVectors);
@@ -97,7 +103,7 @@ private void innerTest(boolean deletionVectors) throws Exception {
}
}
- private void notifyCommits(PartitionMarkDone markDone, boolean isCompact) {
+ public static void notifyCommits(PartitionMarkDone markDone, boolean isCompact) {
ManifestCommittable committable = new ManifestCommittable(Long.MAX_VALUE);
DataFileMeta file = DataFileTestUtils.newFile();
CommitMessageImpl compactMessage;
@@ -122,7 +128,7 @@ private void notifyCommits(PartitionMarkDone markDone, boolean isCompact) {
markDone.notifyCommittable(singletonList(committable));
}
- private static class MockOperatorStateStore implements OperatorStateStore {
+ public static class MockOperatorStateStore implements OperatorStateStore {
@Override
public BroadcastState getBroadcastState(
@@ -151,7 +157,7 @@ public Set getRegisteredBroadcastStateNames() {
}
}
- private static class MockListState implements ListState {
+ public static class MockListState implements ListState {
private final List backingList = new ArrayList<>();
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedure.java
index ff064e9140da..f5cc349b9b24 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedure.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedure.java
@@ -90,7 +90,8 @@ public InternalRow[] call(InternalRow args) {
FileStoreTable fileStoreTable = (FileStoreTable) table;
CoreOptions coreOptions = fileStoreTable.coreOptions();
List actions =
- PartitionMarkDoneAction.createActions(fileStoreTable, coreOptions);
+ PartitionMarkDoneAction.createActions(
+ getClass().getClassLoader(), fileStoreTable, coreOptions);
List partitionPaths =
PartitionPathUtils.generatePartitionPaths(
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
index 27d9a0786a56..80dd6ae42577 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/WriteIntoPaimonTable.scala
@@ -74,7 +74,8 @@ case class WriteIntoPaimonTable(
private def markDoneIfNeeded(commitMessages: Seq[CommitMessage]): Unit = {
val coreOptions = table.coreOptions()
if (coreOptions.toConfiguration.get(CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT)) {
- val actions = PartitionMarkDoneAction.createActions(table, table.coreOptions())
+ val actions =
+ PartitionMarkDoneAction.createActions(getClass.getClassLoader, table, table.coreOptions())
val partitionComputer = new InternalRowPartitionComputer(
coreOptions.partitionDefaultName,
TypeUtils.project(table.rowType(), table.partitionKeys()),
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedureTest.scala
index 8abc7ddfdae8..5551c75505ac 100644
--- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedureTest.scala
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MarkPartitionDoneProcedureTest.scala
@@ -19,6 +19,7 @@
package org.apache.paimon.spark.procedure
import org.apache.paimon.fs.Path
+import org.apache.paimon.partition.actions.PartitionMarkDoneAction
import org.apache.paimon.partition.file.SuccessFile
import org.apache.paimon.spark.PaimonSparkTestBase
@@ -58,4 +59,41 @@ class MarkPartitionDoneProcedureTest extends PaimonSparkTestBase {
}
+ test("Paimon procedure: custom partition mark done test") {
+ spark.sql(
+ s"""
+ |CREATE TABLE T (id STRING, name STRING, day STRING)
+ |USING PAIMON
+ |PARTITIONED BY (day)
+ |TBLPROPERTIES (
+ |'primary-key'='day,id',
+ |'partition.mark-done-action'='success-file,custom',
+ |'partition.mark-done-action.custom.class'='${classOf[MockCustomPartitionMarkDoneAction].getName}'
+ |)
+ |""".stripMargin)
+
+ spark.sql(s"INSERT INTO T VALUES ('1', 'a', '2024-07-13')")
+ spark.sql(s"INSERT INTO T VALUES ('2', 'b', '2024-07-14')")
+
+ checkAnswer(
+ spark.sql(
+ "CALL paimon.sys.mark_partition_done(" +
+ "table => 'test.T', partitions => 'day=2024-07-13;day=2024-07-14')"),
+ Row(true) :: Nil)
+
+ val table = loadTable("T")
+
+ val successPath1 = new Path(table.location, "day=2024-07-13/_SUCCESS")
+ val successFile1 = SuccessFile.safelyFromPath(table.fileIO, successPath1)
+ assertThat(successFile1).isNotNull
+
+ val successPath2 = new Path(table.location, "day=2024-07-14/_SUCCESS")
+ val successFile2 = SuccessFile.safelyFromPath(table.fileIO, successPath2)
+ assertThat(successFile2).isNotNull
+
+ assertThat(MockCustomPartitionMarkDoneAction.getMarkedDonePartitions.toArray)
+ .containsExactlyInAnyOrder("day=2024-07-14/", "day=2024-07-13/")
+
+ }
+
}
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MockCustomPartitionMarkDoneAction.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MockCustomPartitionMarkDoneAction.scala
new file mode 100644
index 000000000000..d123d2c1205e
--- /dev/null
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/MockCustomPartitionMarkDoneAction.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.partition.actions.PartitionMarkDoneAction
+
+import java.util
+
+/** The case class is only applicable for {@link MarkPartitionDoneProcedureTest}. */
+case class MockCustomPartitionMarkDoneAction() extends PartitionMarkDoneAction {
+
+ override def markDone(partition: String): Unit = {
+ MockCustomPartitionMarkDoneAction.add(partition)
+ }
+
+ override def close(): Unit = {}
+}
+
+object MockCustomPartitionMarkDoneAction {
+ val markedDonePartitions = new util.HashSet[String]
+
+ def add(partition: String): Unit = {
+ markedDonePartitions.add(partition)
+ }
+
+ def getMarkedDonePartitions: util.Set[String] = {
+ markedDonePartitions
+ }
+}