Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/content/flink/sql-write.md
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,26 @@ CREATE TABLE my_partitioned_table (
'partition.time-interval'='1 d',
'partition.idle-time-to-done'='15 m',
'partition.mark-done-action'='done-partition'
-- You can also customize a PartitionMarkDoneAction to mark the partition completed.
-- 'partition.mark-done-action'='done-partition,custom',
-- 'partition.mark-done-action.custom.class'='org.apache.paimon.CustomPartitionMarkDoneAction'
);
```
Define a class CustomPartitionMarkDoneAction to implement the PartitionMarkDoneAction interface.
```java
package org.apache.paimon;

public class CustomPartitionMarkDoneAction implements PartitionMarkDoneAction {

@Override
public void markDone(String partition) {
// do something.
}

@Override
public void close() {}
}
```

1. Firstly, you need to define the time parser of the partition and the time interval between partitions in order to
determine when the partition can be properly marked done.
Expand Down
8 changes: 7 additions & 1 deletion docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,13 @@
<td><h5>partition.mark-done-action</h5></td>
<td style="word-wrap: break-word;">"success-file"</td>
<td>String</td>
<td>Action to mark a partition done is to notify the downstream application that the partition has finished writing, the partition is ready to be read.<br />1. 'success-file': add '_success' file to directory.<br />2. 'done-partition': add 'xxx.done' partition to metastore.<br />3. 'mark-event': mark partition event to metastore.<br />Both can be configured at the same time: 'done-partition,success-file,mark-event'.</td>
<td>Action to mark a partition done is to notify the downstream application that the partition has finished writing, the partition is ready to be read.<br />1. 'success-file': add '_success' file to directory.<br />2. 'done-partition': add 'xxx.done' partition to metastore.<br />3. 'mark-event': mark partition event to metastore.<br />4. 'custom': use policy class to create a mark-partition policy.<br />Both can be configured at the same time: 'done-partition,success-file,mark-event,custom'.</td>
</tr>
<tr>
<td><h5>partition.mark-done-action.custom.class</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The partition mark done class for implement PartitionMarkDoneAction interface. Only work in custom mark-done-action.</td>
</tr>
<tr>
<td><h5>partition.timestamp-formatter</h5></td>
Expand Down
17 changes: 16 additions & 1 deletion paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1185,9 +1185,20 @@ public class CoreOptions implements Serializable {
.text("3. 'mark-event': mark partition event to metastore.")
.linebreak()
.text(
"Both can be configured at the same time: 'done-partition,success-file,mark-event'.")
"4. 'custom': use policy class to create a mark-partition policy.")
.linebreak()
.text(
"Both can be configured at the same time: 'done-partition,success-file,mark-event,custom'.")
.build());

public static final ConfigOption<String> PARTITION_MARK_DONE_CUSTOM_CLASS =
key("partition.mark-done-action.custom.class")
.stringType()
.noDefaultValue()
.withDescription(
"The partition mark done class for implement"
+ " PartitionMarkDoneAction interface. Only work in custom mark-done-action.");

public static final ConfigOption<Boolean> METASTORE_PARTITIONED_TABLE =
key("metastore.partitioned-table")
.booleanType()
Expand Down Expand Up @@ -2189,6 +2200,10 @@ public String partitionTimestampPattern() {
return options.get(PARTITION_TIMESTAMP_PATTERN);
}

public String partitionMarkDoneCustomClass() {
return options.get(PARTITION_MARK_DONE_CUSTOM_CLASS);
}

public String consumerId() {
String consumerId = options.get(CONSUMER_ID);
if (consumerId != null && consumerId.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.StringUtils;

import java.io.Closeable;
import java.util.Arrays;
Expand All @@ -29,36 +30,63 @@

import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_CUSTOM_CLASS;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** Action to mark partitions done. */
public interface PartitionMarkDoneAction extends Closeable {

String SUCCESS_FILE = "success-file";
String DONE_PARTITION = "done-partition";
String MARK_EVENT = "mark-event";
String CUSTOM = "custom";

void markDone(String partition) throws Exception;

static List<PartitionMarkDoneAction> createActions(
FileStoreTable fileStoreTable, CoreOptions options) {
ClassLoader cl, FileStoreTable fileStoreTable, CoreOptions options) {
return Arrays.stream(options.toConfiguration().get(PARTITION_MARK_DONE_ACTION).split(","))
.map(
action -> {
switch (action) {
case "success-file":
switch (action.toLowerCase()) {
case SUCCESS_FILE:
return new SuccessFileMarkDoneAction(
fileStoreTable.fileIO(), fileStoreTable.location());
case "done-partition":
case DONE_PARTITION:
return new AddDonePartitionAction(
createMetastoreClient(fileStoreTable, options));
case "mark-event":
case MARK_EVENT:
return new MarkPartitionDoneEventAction(
createMetastoreClient(fileStoreTable, options));
case CUSTOM:
return generateCustomMarkDoneAction(cl, options);
default:
throw new UnsupportedOperationException(action);
}
})
.collect(Collectors.toList());
}

static PartitionMarkDoneAction generateCustomMarkDoneAction(
ClassLoader cl, CoreOptions options) {
if (StringUtils.isNullOrWhitespaceOnly(options.partitionMarkDoneCustomClass())) {
throw new IllegalArgumentException(
String.format(
"You need to set [%s] when you add [%s] mark done action in your property [%s].",
PARTITION_MARK_DONE_CUSTOM_CLASS.key(),
CUSTOM,
PARTITION_MARK_DONE_ACTION.key()));
}
String customClass = options.partitionMarkDoneCustomClass();
try {
return (PartitionMarkDoneAction) cl.loadClass(customClass).newInstance();
} catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
throw new RuntimeException(
"Can not create new instance for custom class from " + customClass, e);
}
}

static MetastoreClient createMetastoreClient(FileStoreTable table, CoreOptions options) {
MetastoreClient.Factory metastoreClientFactory =
table.catalogEnvironment().metastoreClientFactory();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ public String[] call(
FileStoreTable fileStoreTable = (FileStoreTable) table;
CoreOptions coreOptions = fileStoreTable.coreOptions();
List<PartitionMarkDoneAction> actions =
PartitionMarkDoneAction.createActions(fileStoreTable, coreOptions);
PartitionMarkDoneAction.createActions(
getClass().getClassLoader(), fileStoreTable, coreOptions);

List<String> partitionPaths =
PartitionPathUtils.generatePartitionPaths(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public MarkPartitionDoneAction(
@Override
public void run() throws Exception {
List<PartitionMarkDoneAction> actions =
PartitionMarkDoneAction.createActions(fileStoreTable, fileStoreTable.coreOptions());
PartitionMarkDoneAction.createActions(
getClass().getClassLoader(), fileStoreTable, fileStoreTable.coreOptions());

List<String> partitionPaths =
PartitionPathUtils.generatePartitionPaths(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public String[] call(ProcedureContext procedureContext, String tableId, String p
FileStoreTable fileStoreTable = (FileStoreTable) table;
CoreOptions coreOptions = fileStoreTable.coreOptions();
List<PartitionMarkDoneAction> actions =
PartitionMarkDoneAction.createActions(fileStoreTable, coreOptions);
PartitionMarkDoneAction.createActions(
procedureContext.getClass().getClassLoader(), fileStoreTable, coreOptions);

List<String> partitionPaths =
PartitionPathUtils.generatePartitionPaths(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public static PartitionListeners create(Committer.Context context, FileStoreTabl

// partition mark done
PartitionMarkDone.create(
context.getClass().getClassLoader(),
context.streamingCheckpointEnabled(),
context.isRestored(),
context.stateStore(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public class PartitionMarkDone implements PartitionListener {
private final boolean waitCompaction;

public static Optional<PartitionMarkDone> create(
ClassLoader cl,
boolean isStreaming,
boolean isRestored,
OperatorStateStore stateStore,
Expand All @@ -75,7 +76,7 @@ public static Optional<PartitionMarkDone> create(
PartitionMarkDoneTrigger.create(coreOptions, isRestored, stateStore);

List<PartitionMarkDoneAction> actions =
PartitionMarkDoneAction.createActions(table, coreOptions);
PartitionMarkDoneAction.createActions(cl, table, coreOptions);

// if batch read skip level 0 files, we should wait compaction to mark done
// otherwise, some data may not be readable, and there might be data delays
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.flink.sink.partition.MockCustomPartitionMarkDoneAction;
import org.apache.paimon.fs.Path;
import org.apache.paimon.partition.actions.PartitionMarkDoneAction;
import org.apache.paimon.partition.file.SuccessFile;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.StreamWriteBuilder;
Expand All @@ -36,8 +38,11 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_ACTION;
import static org.apache.paimon.CoreOptions.PARTITION_MARK_DONE_CUSTOM_CLASS;
import static org.assertj.core.api.Assertions.assertThat;

/** IT cases for {@link MarkPartitionDoneAction}. */
Expand Down Expand Up @@ -103,7 +108,7 @@ public void testPartitionMarkDoneWithSinglePartitionKey(boolean hasPk, String in

@ParameterizedTest
@MethodSource("testArguments")
public void testDropPartitionWithMultiplePartitionKey(boolean hasPk, String invoker)
public void testPartitionMarkDoneWithMultiplePartitionKey(boolean hasPk, String invoker)
throws Exception {
FileStoreTable table = prepareTable(hasPk);

Expand Down Expand Up @@ -149,7 +154,72 @@ public void testDropPartitionWithMultiplePartitionKey(boolean hasPk, String invo
assertThat(successFile2).isNotNull();
}

@ParameterizedTest
@MethodSource("testArguments")
public void testCustomPartitionMarkDoneAction(boolean hasPk, String invoker) throws Exception {

Map<String, String> options = new HashMap<>();
options.put(
PARTITION_MARK_DONE_ACTION.key(),
PartitionMarkDoneAction.SUCCESS_FILE + "," + PartitionMarkDoneAction.CUSTOM);
options.put(
PARTITION_MARK_DONE_CUSTOM_CLASS.key(),
MockCustomPartitionMarkDoneAction.class.getName());

FileStoreTable table = prepareTable(hasPk, options);

switch (invoker) {
case "action":
createAction(
MarkPartitionDoneAction.class,
"mark_partition_done",
"--warehouse",
warehouse,
"--database",
database,
"--table",
tableName,
"--partition",
"partKey0=0,partKey1=1",
"--partition",
"partKey0=1,partKey1=0")
.run();
break;
case "procedure_indexed":
executeSQL(
String.format(
"CALL sys.mark_partition_done('%s.%s', 'partKey0=0,partKey1=1;partKey0=1,partKey1=0')",
database, tableName));
break;
case "procedure_named":
executeSQL(
String.format(
"CALL sys.mark_partition_done(`table` => '%s.%s', partitions => 'partKey0=0,partKey1=1;partKey0=1,partKey1=0')",
database, tableName));
break;
default:
throw new UnsupportedOperationException(invoker);
}

Path successPath1 = new Path(table.location(), "partKey0=0/partKey1=1/_SUCCESS");
SuccessFile successFile1 = SuccessFile.safelyFromPath(table.fileIO(), successPath1);
assertThat(successFile1).isNotNull();

Path successPath2 = new Path(table.location(), "partKey0=1/partKey1=0/_SUCCESS");
SuccessFile successFile2 = SuccessFile.safelyFromPath(table.fileIO(), successPath2);
assertThat(successFile2).isNotNull();

assertThat(MockCustomPartitionMarkDoneAction.getMarkedDonePartitions())
.containsExactlyInAnyOrder("partKey0=0/partKey1=1/", "partKey0=1/partKey1=0/");
}

private FileStoreTable prepareTable(boolean hasPk) throws Exception {
return prepareTable(hasPk, Collections.emptyMap());
}

private FileStoreTable prepareTable(boolean hasPk, Map<String, String> options)
throws Exception {

FileStoreTable table =
createFileStoreTable(
ROW_TYPE,
Expand All @@ -158,7 +228,7 @@ private FileStoreTable prepareTable(boolean hasPk) throws Exception {
? Arrays.asList("partKey0", "partKey1", "dt")
: Collections.emptyList(),
hasPk ? Collections.emptyList() : Collections.singletonList("dt"),
new HashMap<>());
options);
SnapshotManager snapshotManager = table.snapshotManager();
StreamWriteBuilder streamWriteBuilder =
table.newStreamWriteBuilder().withCommitUser(commitUser);
Expand Down
Loading
Loading