diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index f60d0ec91009..aa5dcccc3b18 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -930,6 +930,12 @@
Enum |
What frequency is used to generate tags.
Possible values:- "daily": Generate a tag every day.
- "hourly": Generate a tag every hour.
- "two-hours": Generate a tag every two hours.
|
+
+ tag.creation-period-duration |
+ (none) |
+ Duration |
+ The period duration for tag auto create periods.If user set it, tag.creation-period would be invalid. |
+
tag.default-time-retained |
(none) |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index efd886501266..75ca93b26e89 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1242,6 +1242,13 @@ public class CoreOptions implements Serializable {
.defaultValue(TagPeriodFormatter.WITH_DASHES)
.withDescription("The date format for tag periods.");
+ public static final ConfigOption TAG_PERIOD_DURATION =
+ key("tag.creation-period-duration")
+ .durationType()
+ .noDefaultValue()
+ .withDescription(
+ "The period duration for tag auto create periods.If user set it, tag.creation-period would be invalid.");
+
public static final ConfigOption TAG_NUM_RETAINED_MAX =
key("tag.num-retained-max")
.intType()
@@ -2238,6 +2245,10 @@ public TagPeriodFormatter tagPeriodFormatter() {
return options.get(TAG_PERIOD_FORMATTER);
}
+ public Optional tagPeriodDuration() {
+ return options.getOptional(TAG_PERIOD_DURATION);
+ }
+
@Nullable
public Integer tagNumRetainedMax() {
return options.get(TAG_NUM_RETAINED_MAX);
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
index c0fbe718c8fa..761c3c297315 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
@@ -31,6 +31,7 @@
import static java.time.temporal.ChronoField.DAY_OF_MONTH;
import static java.time.temporal.ChronoField.HOUR_OF_DAY;
+import static java.time.temporal.ChronoField.MINUTE_OF_HOUR;
import static java.time.temporal.ChronoField.MONTH_OF_YEAR;
import static java.time.temporal.ChronoField.YEAR;
import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -60,6 +61,16 @@ public interface TagPeriodHandler {
.toFormatter()
.withResolverStyle(ResolverStyle.LENIENT);
+ DateTimeFormatter MINUTE_FORMATTER =
+ new DateTimeFormatterBuilder()
+ .appendValue(YEAR, 1, 10, SignStyle.NORMAL)
+ .appendValue(MONTH_OF_YEAR, 2, 2, SignStyle.NORMAL)
+ .appendValue(DAY_OF_MONTH, 2, 2, SignStyle.NORMAL)
+ .appendValue(HOUR_OF_DAY, 2, 2, SignStyle.NORMAL)
+ .appendValue(MINUTE_OF_HOUR, 2, 2, SignStyle.NORMAL)
+ .toFormatter()
+ .withResolverStyle(ResolverStyle.LENIENT);
+
DateTimeFormatter DAY_FORMATTER =
new DateTimeFormatterBuilder()
.appendValue(YEAR, 1, 10, SignStyle.NORMAL)
@@ -218,7 +229,31 @@ protected DateTimeFormatter formatter() {
}
}
+ /** Period duration {@link TagPeriodHandler}. */
+ class PeriodDurationTagPeriodHandler extends BaseTagPeriodHandler {
+
+ Duration periodDuration;
+
+ public PeriodDurationTagPeriodHandler(Duration duration) {
+ this.periodDuration = duration;
+ }
+
+ @Override
+ protected Duration onePeriod() {
+ return periodDuration;
+ }
+
+ @Override
+ protected DateTimeFormatter formatter() {
+ return MINUTE_FORMATTER;
+ }
+ }
+
static TagPeriodHandler create(CoreOptions options) {
+ if (options.tagPeriodDuration().isPresent()) {
+ return new PeriodDurationTagPeriodHandler(options.tagPeriodDuration().get());
+ }
+
switch (options.tagCreationPeriod()) {
case DAILY:
return new DailyTagPeriodHandler(options.tagPeriodFormatter());
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/system/PeriodDurationTagsTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/system/PeriodDurationTagsTableTest.java
new file mode 100644
index 000000000000..713ff131494e
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/table/system/PeriodDurationTagsTableTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.system;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.manifest.ManifestCommittable;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.TableTestBase;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.time.LocalDateTime;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link TagsTable}. */
+public class PeriodDurationTagsTableTest extends TableTestBase {
+
+ private static final String tableName = "3TagTestTable";
+ private TagsTable tagsTable;
+ private FileStoreTable table;
+
+ @BeforeEach
+ void before() throws Exception {
+ Identifier identifier = identifier(tableName);
+ Schema schema =
+ Schema.newBuilder()
+ .column("product_id", DataTypes.INT())
+ .column("price", DataTypes.INT())
+ .column("sales", DataTypes.INT())
+ .primaryKey("product_id")
+ .option("tag.automatic-creation", "watermark")
+ .option("tag.creation-period-duration", "120 s")
+ .build();
+ catalog.createTable(identifier, schema, true);
+ table = (FileStoreTable) catalog.getTable(identifier);
+ TableCommitImpl commit = table.newCommit(commitUser).ignoreEmptyCommit(false);
+
+ commit.commit(
+ new ManifestCommittable(
+ 0,
+ Timestamp.fromLocalDateTime(LocalDateTime.parse("2025-01-07T12:00:01"))
+ .getMillisecond()));
+ commit.commit(
+ new ManifestCommittable(
+ 1,
+ Timestamp.fromLocalDateTime(LocalDateTime.parse("2025-01-07T15:00:01"))
+ .getMillisecond()));
+
+ tagsTable = (TagsTable) catalog.getTable(identifier(tableName + "$tags"));
+ }
+
+ @Test
+ void testCreateTagsWithCustomDuration() throws Exception {
+ List result = read(tagsTable);
+ assertThat(result.size()).isEqualTo(2);
+ assertThat(result.get(0).getString(0).toString()).isEqualTo("202501071158");
+ assertThat(result.get(1).getString(0).toString()).isEqualTo("202501071458");
+ }
+
+ @Override
+ public void after() throws IOException {
+ table.deleteTag("202501071158");
+ table.deleteTag("202501071458");
+ }
+}