From 403e0a0abf99b844bd3cca08e429fdeac99621e5 Mon Sep 17 00:00:00 2001 From: Askwang <135721692+Askwang@users.noreply.github.com> Date: Fri, 6 Dec 2024 16:54:45 +0800 Subject: [PATCH] add conf --- .../generated/core_configuration.html | 6 ++ .../java/org/apache/paimon/CoreOptions.java | 10 +++ .../org/apache/paimon/AbstractFileStore.java | 3 +- .../paimon/operation/PartitionExpire.java | 27 +++++--- .../procedure/ExpirePartitionsProcedure.java | 5 +- .../flink/action/ExpirePartitionsAction.java | 3 +- .../procedure/ExpirePartitionsProcedure.java | 5 +- .../ExpirePartitionsProcedureITCase.java | 37 +++++++++++ .../procedure/ExpirePartitionsProcedure.java | 5 +- .../ExpirePartitionsProcedureTest.scala | 65 +++++++++++++++++++ 10 files changed, 150 insertions(+), 16 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 6fb2c72650fe..7d6bacccb026 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -593,6 +593,12 @@ Duration The check interval of partition expiration. + +
partition.expiration-max-num
+ 100 + Integer + The default deleted num of partition expiration. +
partition.expiration-strategy
values-time diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 765d5a1e32d6..8aebf2f289a0 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -809,6 +809,12 @@ public class CoreOptions implements Serializable { .defaultValue(Duration.ofHours(1)) .withDescription("The check interval of partition expiration."); + public static final ConfigOption PARTITION_EXPIRATION_MAX_NUM = + key("partition.expiration-max-num") + .intType() + .defaultValue(100) + .withDescription("The default deleted num of partition expiration."); + public static final ConfigOption PARTITION_TIMESTAMP_FORMATTER = key("partition.timestamp-formatter") .stringType() @@ -2126,6 +2132,10 @@ public Duration partitionExpireCheckInterval() { return options.get(PARTITION_EXPIRATION_CHECK_INTERVAL); } + public int partitionExpireMaxNum() { + return options.get(PARTITION_EXPIRATION_MAX_NUM); + } + public PartitionExpireStrategy partitionExpireStrategy() { return options.get(PARTITION_EXPIRATION_STRATEGY); } diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 1a538ad89e47..54f554aa46d3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -309,7 +309,8 @@ public PartitionExpire newPartitionExpire(String commitUser) { newScan(), newCommit(commitUser), metastoreClient, - options.endInputCheckPartitionExpire()); + options.endInputCheckPartitionExpire(), + options.partitionExpireMaxNum()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java index 62a9b796476a..d432a37dfd9c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java @@ -54,7 +54,7 @@ public class PartitionExpire { private LocalDateTime lastCheck; private final PartitionExpireStrategy strategy; private final boolean endInputCheckPartitionExpire; - private int maxExpires; + private int maxExpireNum; public PartitionExpire( Duration expirationTime, @@ -63,7 +63,8 @@ public PartitionExpire( FileStoreScan scan, FileStoreCommit commit, @Nullable MetastoreClient metastoreClient, - boolean endInputCheckPartitionExpire) { + boolean endInputCheckPartitionExpire, + int maxExpireNum) { this.expirationTime = expirationTime; this.checkInterval = checkInterval; this.strategy = strategy; @@ -72,7 +73,7 @@ public PartitionExpire( this.metastoreClient = metastoreClient; this.lastCheck = LocalDateTime.now(); this.endInputCheckPartitionExpire = endInputCheckPartitionExpire; - this.maxExpires = Integer.MAX_VALUE; + this.maxExpireNum = maxExpireNum; } public PartitionExpire( @@ -81,8 +82,17 @@ public PartitionExpire( PartitionExpireStrategy strategy, FileStoreScan scan, FileStoreCommit commit, - @Nullable MetastoreClient metastoreClient) { - this(expirationTime, checkInterval, strategy, scan, commit, metastoreClient, false); + @Nullable MetastoreClient metastoreClient, + int maxExpireNum) { + this( + expirationTime, + checkInterval, + strategy, + scan, + commit, + metastoreClient, + false, + maxExpireNum); } public PartitionExpire withLock(Lock lock) { @@ -90,8 +100,8 @@ public PartitionExpire withLock(Lock lock) { return this; } - public PartitionExpire withMaxExpires(int maxExpires) { - this.maxExpires = maxExpires; + public PartitionExpire withMaxExpireNum(int maxExpireNum) { + this.maxExpireNum = maxExpireNum; return this; } @@ -145,6 +155,7 @@ private List> doExpire( List> expired = new ArrayList<>(); if (!expiredPartValues.isEmpty()) { + // convert partition value to partition string, and limit the partition num expired = convertToPartitionString(expiredPartValues); LOG.info("Expire Partitions: {}", expired); if (metastoreClient != null) { @@ -175,7 +186,7 @@ private List> convertToPartitionString( .sorted() .map(s -> s.split(DELIMITER)) .map(strategy::toPartitionString) - .limit(Math.min(expiredPartValues.size(), maxExpires)) + .limit(Math.min(expiredPartValues.size(), maxExpireNum)) .collect(Collectors.toList()); } } diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java index c0e5a65c49ef..1c0d73cfbe38 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java @@ -93,9 +93,10 @@ public String[] call( .catalogEnvironment() .metastoreClientFactory()) .map(MetastoreClient.Factory::create) - .orElse(null)); + .orElse(null), + fileStore.options().partitionExpireMaxNum()); if (maxExpires != null) { - partitionExpire.withMaxExpires(maxExpires); + partitionExpire.withMaxExpireNum(maxExpires); } List> expired = partitionExpire.expire(Long.MAX_VALUE); return expired == null || expired.isEmpty() diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java index 9528bc137d6f..0fa17e1a8ddb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ExpirePartitionsAction.java @@ -72,7 +72,8 @@ public ExpirePartitionsAction( .catalogEnvironment() .metastoreClientFactory()) .map(MetastoreClient.Factory::create) - .orElse(null)); + .orElse(null), + fileStore.options().partitionExpireMaxNum()); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java index ee6075a927d3..ce282c6800cc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedure.java @@ -97,9 +97,10 @@ public String identifier() { .catalogEnvironment() .metastoreClientFactory()) .map(MetastoreClient.Factory::create) - .orElse(null)); + .orElse(null), + fileStore.options().partitionExpireMaxNum()); if (maxExpires != null) { - partitionExpire.withMaxExpires(maxExpires); + partitionExpire.withMaxExpireNum(maxExpires); } List> expired = partitionExpire.expire(Long.MAX_VALUE); return expired == null || expired.isEmpty() diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java index 2d1fb6dde78a..a40968e067bc 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpirePartitionsProcedureITCase.java @@ -415,6 +415,43 @@ public void testNullPartitionExpire() { .containsExactly("No expired partitions."); } + @Test + public void testExpirePartitionsWithDefaultNum() throws Exception { + sql( + "CREATE TABLE T (" + + " k STRING," + + " dt STRING," + + " PRIMARY KEY (k, dt) NOT ENFORCED" + + ") PARTITIONED BY (dt) WITH (" + + " 'bucket' = '1'," + + " 'partition.expiration-max-num'='2'" + + ")"); + FileStoreTable table = paimonTable("T"); + + sql("INSERT INTO T VALUES ('a', '2024-06-01')"); + sql("INSERT INTO T VALUES ('b', '2024-06-02')"); + sql("INSERT INTO T VALUES ('c', '2024-06-03')"); + // This partition never expires. + sql("INSERT INTO T VALUES ('Never-expire', '9999-09-09')"); + Function consumerReadResult = + (InternalRow row) -> row.getString(0) + ":" + row.getString(1); + + assertThat(read(table, consumerReadResult)) + .containsExactlyInAnyOrder( + "a:2024-06-01", "b:2024-06-02", "c:2024-06-03", "Never-expire:9999-09-09"); + + assertThat( + callExpirePartitions( + "CALL sys.expire_partitions(" + + "`table` => 'default.T'" + + ", expiration_time => '1 d'" + + ", timestamp_formatter => 'yyyy-MM-dd')")) + .containsExactlyInAnyOrder("dt=2024-06-01", "dt=2024-06-02"); + + assertThat(read(table, consumerReadResult)) + .containsExactlyInAnyOrder("c:2024-06-03", "Never-expire:9999-09-09"); + } + /** Return a list of expired partitions. */ public List callExpirePartitions(String callSql) { return sql(callSql).stream() diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java index 7b388227e5a4..e3a53d2bd2ef 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpirePartitionsProcedure.java @@ -107,9 +107,10 @@ public InternalRow[] call(InternalRow args) { .catalogEnvironment() .metastoreClientFactory()) .map(MetastoreClient.Factory::create) - .orElse(null)); + .orElse(null), + fileStore.options().partitionExpireMaxNum()); if (maxExpires != null) { - partitionExpire.withMaxExpires(maxExpires); + partitionExpire.withMaxExpireNum(maxExpires); } List> expired = partitionExpire.expire(Long.MAX_VALUE); return expired == null || expired.isEmpty() diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala index 4561e532f538..9f0d23dc9379 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpirePartitionsProcedureTest.scala @@ -551,4 +551,69 @@ class ExpirePartitionsProcedureTest extends PaimonSparkTestBase with StreamTest } } } + + test("Paimon Procedure: expire partitions with default num") { + failAfter(streamingTimeout) { + withTempDir { + checkpointDir => + spark.sql( + s""" + |CREATE TABLE T (k STRING, pt STRING) + |TBLPROPERTIES ('primary-key'='k,pt', 'bucket'='1', 'partition.expiration-max-num'='2') + |PARTITIONED BY (pt) + |""".stripMargin) + val location = loadTable("T").location().toString + + val inputData = MemoryStream[(String, String)] + val stream = inputData + .toDS() + .toDF("k", "pt") + .writeStream + .option("checkpointLocation", checkpointDir.getCanonicalPath) + .foreachBatch { + (batch: Dataset[Row], _: Long) => + batch.write.format("paimon").mode("append").save(location) + } + .start() + + val query = () => spark.sql("SELECT * FROM T") + + try { + // snapshot-1 + inputData.addData(("a", "2024-06-01")) + stream.processAllAvailable() + + // snapshot-2 + inputData.addData(("b", "2024-06-02")) + stream.processAllAvailable() + + // snapshot-3 + inputData.addData(("c", "2024-06-03")) + stream.processAllAvailable() + + // This partition never expires. + inputData.addData(("Never-expire", "9999-09-09")) + stream.processAllAvailable() + + checkAnswer( + query(), + Row("a", "2024-06-01") :: Row("b", "2024-06-02") :: Row("c", "2024-06-03") :: Row( + "Never-expire", + "9999-09-09") :: Nil) + // call expire_partitions. + checkAnswer( + spark.sql( + "CALL paimon.sys.expire_partitions(table => 'test.T', expiration_time => '1 d'" + + ", timestamp_formatter => 'yyyy-MM-dd')"), + Row("pt=2024-06-01") :: Row("pt=2024-06-02") :: Nil + ) + + checkAnswer(query(), Row("c", "2024-06-03") :: Row("Never-expire", "9999-09-09") :: Nil) + + } finally { + stream.stop() + } + } + } + } }