diff --git a/docs/content/flink/sql-query.md b/docs/content/flink/sql-query.md
index 89136b0b0635..6cba9f930f5a 100644
--- a/docs/content/flink/sql-query.md
+++ b/docs/content/flink/sql-query.md
@@ -105,6 +105,23 @@ If you want see `DELETE` records, you can use audit_log table:
SELECT * FROM t$audit_log /*+ OPTIONS('incremental-between' = '12,20') */;
```
+### Batch Incremental between Auto-created Tags
+
+You can use `incremental-between` to query incremental changes between two tags. But for auto-created tag, the tag may
+not be created in-time because of data delay.
+
+For example, assume that tags '2024-12-01', '2024-12-02' and '2024-12-04' are auto created daily. Data for 12/03 are delayed
+and ingested with data for 12/04. Now if you want to query the incremental changes between tags, and you don't know the tag
+of 12/03 is not created, you will use `incremental-between` with '2024-12-01,2024-12-02', '2024-12-02,2024-12-03' and
+'2024-12-03,2024-12-04' respectively, then you will get an error that the tag '2024-12-03' doesn't exist.
+
+We introduced a new option `incremental-to-auto-tag` for this scenario. You can only specify the end tag, and Paimon will
+find an earlier tag and return changes between them. If the tag doesn't exist or the earlier tag doesn't exist, return empty.
+
+For example, when you query 'incremental-to-auto-tag=2024-12-01' or 'incremental-to-auto-tag=2024-12-03', the result is
+empty; Query 'incremental-to-auto-tag=2024-12-02', the result is change between 12/01 and 12/02; Query 'incremental-to-auto-tag=2024-12-04',
+the result is change between 12/02 and 12/04.
+
## Streaming Query
By default, Streaming read produces the latest snapshot on the table upon first startup,
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html
index f60d0ec91009..2a5d7c924c3c 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -392,6 +392,12 @@
String |
Read incremental changes between start timestamp (exclusive) and end timestamp, for example, 't1,t2' means changes between timestamp t1 and timestamp t2. |
+
+ incremental-to-auto-tag |
+ (none) |
+ String |
+ Used to specify the auto-created tag to reading incremental changes. |
+
local-merge-buffer-size |
(none) |
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index efd886501266..005199acd862 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1080,6 +1080,13 @@ public class CoreOptions implements Serializable {
"Read incremental changes between start timestamp (exclusive) and end timestamp, "
+ "for example, 't1,t2' means changes between timestamp t1 and timestamp t2.");
+ public static final ConfigOption INCREMENTAL_TO_AUTO_TAG =
+ key("incremental-to-auto-tag")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "Used to specify the auto-created tag to reading incremental changes.");
+
public static final ConfigOption END_INPUT_CHECK_PARTITION_EXPIRE =
key("end-input.check-partition-expire")
.booleanType()
@@ -2119,6 +2126,10 @@ public IncrementalBetweenScanMode incrementalBetweenScanMode() {
return options.get(INCREMENTAL_BETWEEN_SCAN_MODE);
}
+ public String incrementalToAutoTag() {
+ return options.get(INCREMENTAL_TO_AUTO_TAG);
+ }
+
public Integer scanManifestParallelism() {
return options.get(SCAN_MANIFEST_PARALLELISM);
}
@@ -2752,7 +2763,8 @@ public static void setDefaultValues(Options options) {
}
if ((options.contains(INCREMENTAL_BETWEEN_TIMESTAMP)
- || options.contains(INCREMENTAL_BETWEEN))
+ || options.contains(INCREMENTAL_BETWEEN)
+ || options.contains(INCREMENTAL_TO_AUTO_TAG))
&& !options.contains(SCAN_MODE)) {
options.set(SCAN_MODE, StartupMode.INCREMENTAL);
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 4bddcdd72d7e..ae24fb09576c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -62,6 +62,7 @@
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP;
+import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG;
import static org.apache.paimon.CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS;
import static org.apache.paimon.CoreOptions.SCAN_MODE;
import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID;
@@ -278,7 +279,8 @@ private static void validateStartupMode(CoreOptions options) {
SCAN_FILE_CREATION_TIME_MILLIS,
SCAN_TAG_NAME,
INCREMENTAL_BETWEEN_TIMESTAMP,
- INCREMENTAL_BETWEEN),
+ INCREMENTAL_BETWEEN,
+ INCREMENTAL_TO_AUTO_TAG),
Arrays.asList(SCAN_TIMESTAMP_MILLIS, SCAN_TIMESTAMP));
} else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT) {
checkExactOneOptionExistInMode(
@@ -294,14 +296,16 @@ private static void validateStartupMode(CoreOptions options) {
SCAN_TIMESTAMP,
SCAN_FILE_CREATION_TIME_MILLIS,
INCREMENTAL_BETWEEN_TIMESTAMP,
- INCREMENTAL_BETWEEN),
+ INCREMENTAL_BETWEEN,
+ INCREMENTAL_TO_AUTO_TAG),
Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TAG_NAME));
} else if (options.startupMode() == CoreOptions.StartupMode.INCREMENTAL) {
checkExactOneOptionExistInMode(
options,
options.startupMode(),
INCREMENTAL_BETWEEN,
- INCREMENTAL_BETWEEN_TIMESTAMP);
+ INCREMENTAL_BETWEEN_TIMESTAMP,
+ INCREMENTAL_TO_AUTO_TAG);
checkOptionsConflict(
options,
Arrays.asList(
@@ -310,7 +314,10 @@ private static void validateStartupMode(CoreOptions options) {
SCAN_FILE_CREATION_TIME_MILLIS,
SCAN_TIMESTAMP,
SCAN_TAG_NAME),
- Arrays.asList(INCREMENTAL_BETWEEN, INCREMENTAL_BETWEEN_TIMESTAMP));
+ Arrays.asList(
+ INCREMENTAL_BETWEEN,
+ INCREMENTAL_BETWEEN_TIMESTAMP,
+ INCREMENTAL_TO_AUTO_TAG));
} else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) {
checkOptionExistInMode(options, SCAN_SNAPSHOT_ID, options.startupMode());
checkOptionsConflict(
@@ -321,7 +328,8 @@ private static void validateStartupMode(CoreOptions options) {
SCAN_FILE_CREATION_TIME_MILLIS,
SCAN_TAG_NAME,
INCREMENTAL_BETWEEN_TIMESTAMP,
- INCREMENTAL_BETWEEN),
+ INCREMENTAL_BETWEEN,
+ INCREMENTAL_TO_AUTO_TAG),
Collections.singletonList(SCAN_SNAPSHOT_ID));
} else if (options.startupMode() == CoreOptions.StartupMode.FROM_FILE_CREATION_TIME) {
checkOptionExistInMode(
@@ -335,7 +343,8 @@ private static void validateStartupMode(CoreOptions options) {
SCAN_TIMESTAMP_MILLIS,
SCAN_TAG_NAME,
INCREMENTAL_BETWEEN_TIMESTAMP,
- INCREMENTAL_BETWEEN),
+ INCREMENTAL_BETWEEN,
+ INCREMENTAL_TO_AUTO_TAG),
Collections.singletonList(SCAN_FILE_CREATION_TIME_MILLIS));
} else {
checkOptionNotExistInMode(options, SCAN_TIMESTAMP_MILLIS, options.startupMode());
@@ -347,6 +356,7 @@ private static void validateStartupMode(CoreOptions options) {
checkOptionNotExistInMode(
options, INCREMENTAL_BETWEEN_TIMESTAMP, options.startupMode());
checkOptionNotExistInMode(options, INCREMENTAL_BETWEEN, options.startupMode());
+ checkOptionNotExistInMode(options, INCREMENTAL_TO_AUTO_TAG, options.startupMode());
}
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 24c6943f546f..a5810bfc24b2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -20,12 +20,14 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
+import org.apache.paimon.Snapshot;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.Consumer;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.FileStoreScan;
+import org.apache.paimon.options.Options;
import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner;
import org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotFullStartingScanner;
@@ -44,21 +46,29 @@
import org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner;
import org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner;
+import org.apache.paimon.tag.Tag;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TagManager;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
+import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
/** An abstraction layer above {@link FileStoreScan} to provide input split generation. */
public abstract class AbstractDataTableScan implements DataTableScan {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractDataTableScan.class);
+
private final CoreOptions options;
protected final SnapshotReader snapshotReader;
@@ -199,50 +209,87 @@ protected StartingScanner createStartingScanner(boolean isStreaming) {
: new StaticFromSnapshotStartingScanner(snapshotManager, scanSnapshotId);
case INCREMENTAL:
checkArgument(!isStreaming, "Cannot read incremental in streaming mode.");
- Pair incrementalBetween = options.incrementalBetween();
- CoreOptions.IncrementalBetweenScanMode scanType =
- options.incrementalBetweenScanMode();
- ScanMode scanMode;
- switch (scanType) {
- case AUTO:
- scanMode =
- options.changelogProducer() == ChangelogProducer.NONE
- ? ScanMode.DELTA
- : ScanMode.CHANGELOG;
- break;
- case DELTA:
- scanMode = ScanMode.DELTA;
- break;
- case CHANGELOG:
- scanMode = ScanMode.CHANGELOG;
- break;
- default:
- throw new UnsupportedOperationException(
- "Unknown incremental scan type " + scanType.name());
- }
- if (options.toMap().get(CoreOptions.INCREMENTAL_BETWEEN.key()) != null) {
- try {
- return new IncrementalStartingScanner(
- snapshotManager,
- Long.parseLong(incrementalBetween.getLeft()),
- Long.parseLong(incrementalBetween.getRight()),
- scanMode);
- } catch (NumberFormatException e) {
- return new IncrementalTagStartingScanner(
- snapshotManager,
- incrementalBetween.getLeft(),
- incrementalBetween.getRight());
- }
- } else {
- return new IncrementalTimeStampStartingScanner(
- snapshotManager,
- Long.parseLong(incrementalBetween.getLeft()),
- Long.parseLong(incrementalBetween.getRight()),
- scanMode);
- }
+ return createIncrementalStartingScanner(snapshotManager);
default:
throw new UnsupportedOperationException(
"Unknown startup mode " + startupMode.name());
}
}
+
+ private StartingScanner createIncrementalStartingScanner(SnapshotManager snapshotManager) {
+ CoreOptions.IncrementalBetweenScanMode scanType = options.incrementalBetweenScanMode();
+ ScanMode scanMode;
+ switch (scanType) {
+ case AUTO:
+ scanMode =
+ options.changelogProducer() == ChangelogProducer.NONE
+ ? ScanMode.DELTA
+ : ScanMode.CHANGELOG;
+ break;
+ case DELTA:
+ scanMode = ScanMode.DELTA;
+ break;
+ case CHANGELOG:
+ scanMode = ScanMode.CHANGELOG;
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown incremental scan type " + scanType.name());
+ }
+
+ Options conf = options.toConfiguration();
+ TagManager tagManager =
+ new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
+ if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN)) {
+ Pair incrementalBetween = options.incrementalBetween();
+ Optional startTag = tagManager.get(incrementalBetween.getLeft());
+ Optional endTag = tagManager.get(incrementalBetween.getRight());
+ if (startTag.isPresent() && endTag.isPresent()) {
+ Snapshot start = startTag.get().trimToSnapshot();
+ Snapshot end = endTag.get().trimToSnapshot();
+
+ LOG.info(
+ "{} start and end are parsed to tag with snapshot id {} to {}.",
+ INCREMENTAL_BETWEEN.key(),
+ start.id(),
+ end.id());
+
+ if (end.id() <= start.id()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s",
+ incrementalBetween.getRight(),
+ end.id(),
+ incrementalBetween.getLeft(),
+ start.id()));
+ }
+ return new IncrementalTagStartingScanner(snapshotManager, start, end);
+ } else {
+ long startId, endId;
+ try {
+ startId = Long.parseLong(incrementalBetween.getLeft());
+ endId = Long.parseLong(incrementalBetween.getRight());
+ } catch (NumberFormatException e) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Didn't find two tags for start '%s' and end '%s', and they are not two snapshot Ids. "
+ + "Please set two tags or two snapshot Ids.",
+ incrementalBetween.getLeft(), incrementalBetween.getRight()));
+ }
+ return new IncrementalStartingScanner(snapshotManager, startId, endId, scanMode);
+ }
+ } else if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)) {
+ Pair incrementalBetween = options.incrementalBetween();
+ return new IncrementalTimeStampStartingScanner(
+ snapshotManager,
+ Long.parseLong(incrementalBetween.getLeft()),
+ Long.parseLong(incrementalBetween.getRight()),
+ scanMode);
+ } else if (conf.contains(CoreOptions.INCREMENTAL_TO_AUTO_TAG)) {
+ String endTag = options.incrementalToAutoTag();
+ return IncrementalTagStartingScanner.create(snapshotManager, endTag, options);
+ } else {
+ throw new UnsupportedOperationException("Unknown incremental read mode.");
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/EmptyResultStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/EmptyResultStartingScanner.java
new file mode 100644
index 000000000000..fc38e272d517
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/EmptyResultStartingScanner.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.source.snapshot;
+
+import org.apache.paimon.utils.SnapshotManager;
+
+/** This scanner always return an empty result. */
+public class EmptyResultStartingScanner extends AbstractStartingScanner {
+
+ EmptyResultStartingScanner(SnapshotManager snapshotManager) {
+ super(snapshotManager);
+ }
+
+ @Override
+ public Result scan(SnapshotReader snapshotReader) {
+ return new NoSnapshot();
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
index e08ac9f44c60..55212b1c9a93 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java
@@ -18,10 +18,21 @@
package org.apache.paimon.table.source.snapshot;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.tag.Tag;
+import org.apache.paimon.tag.TagPeriodHandler;
+import org.apache.paimon.tag.TagTimeExtractor;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
+import java.time.LocalDateTime;
+import java.util.Optional;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
+import static org.apache.paimon.utils.Preconditions.checkState;
+
/** {@link StartingScanner} for incremental changes by tag. */
public class IncrementalTagStartingScanner extends AbstractStartingScanner {
@@ -29,18 +40,10 @@ public class IncrementalTagStartingScanner extends AbstractStartingScanner {
private final Snapshot end;
public IncrementalTagStartingScanner(
- SnapshotManager snapshotManager, String startTagName, String endTagName) {
+ SnapshotManager snapshotManager, Snapshot start, Snapshot end) {
super(snapshotManager);
- TagManager tagManager =
- new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
- start = tagManager.getOrThrow(startTagName).trimToSnapshot();
- end = tagManager.getOrThrow(endTagName).trimToSnapshot();
- if (end.id() <= start.id()) {
- throw new IllegalArgumentException(
- String.format(
- "Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s",
- endTagName, end.id(), startTagName, start.id()));
- }
+ this.start = start;
+ this.end = end;
this.startingSnapshotId = start.id();
}
@@ -48,4 +51,62 @@ public IncrementalTagStartingScanner(
public Result scan(SnapshotReader reader) {
return StartingScanner.fromPlan(reader.withSnapshot(end).readIncrementalDiff(start));
}
+
+ public static AbstractStartingScanner create(
+ SnapshotManager snapshotManager, String endTagName, CoreOptions options) {
+ TagTimeExtractor extractor = TagTimeExtractor.createForAutoTag(options);
+ checkNotNull(
+ extractor,
+ "Table's tag creation mode doesn't support '%s' scan mode.",
+ CoreOptions.INCREMENTAL_TO_AUTO_TAG);
+ TagPeriodHandler periodHandler = TagPeriodHandler.create(options);
+ checkArgument(
+ periodHandler.isAutoTag(endTagName),
+ "Specified tag '%s' is not an auto-created tag.",
+ endTagName);
+
+ TagManager tagManager =
+ new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
+
+ Optional endTag = tagManager.get(endTagName);
+ if (!endTag.isPresent()) {
+ return new EmptyResultStartingScanner(snapshotManager);
+ }
+
+ Snapshot end = endTag.get().trimToSnapshot();
+
+ Snapshot earliestSnapshot = snapshotManager.earliestSnapshot();
+ checkState(earliestSnapshot != null, "No tags can be found.");
+
+ LocalDateTime earliestTime =
+ extractor
+ .extract(earliestSnapshot.timeMillis(), earliestSnapshot.watermark())
+ .orElseThrow(
+ () ->
+ new RuntimeException(
+ "Cannot get valid tag time from the earliest snapshot."));
+ LocalDateTime earliestTagTime = periodHandler.normalizeToPreviousTag(earliestTime);
+
+ LocalDateTime endTagTime = periodHandler.tagToTime(endTagName);
+ LocalDateTime previousTagTime = periodHandler.previousTagTime(endTagTime);
+
+ Snapshot start = null;
+ while (previousTagTime.isAfter(earliestTagTime)
+ || previousTagTime.isEqual(earliestTagTime)) {
+ String previousTagName = periodHandler.timeToTag(previousTagTime);
+ Optional previousTag = tagManager.get(previousTagName);
+ if (previousTag.isPresent()) {
+ start = previousTag.get().trimToSnapshot();
+ break;
+ } else {
+ previousTagTime = periodHandler.previousTagTime(previousTagTime);
+ }
+ }
+
+ if (start == null) {
+ return new EmptyResultStartingScanner(snapshotManager);
+ }
+
+ return new IncrementalTagStartingScanner(snapshotManager, start, end);
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
index c0fbe718c8fa..4934e26b22f6 100644
--- a/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
+++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java
@@ -88,6 +88,8 @@ public interface TagPeriodHandler {
LocalDateTime nextTagTime(LocalDateTime time);
+ LocalDateTime previousTagTime(LocalDateTime time);
+
boolean isAutoTag(String tagName);
/** Base implementation of {@link TagPeriodHandler}. */
@@ -127,6 +129,11 @@ public LocalDateTime nextTagTime(LocalDateTime time) {
return time.plus(onePeriod());
}
+ @Override
+ public LocalDateTime previousTagTime(LocalDateTime time) {
+ return time.minus(onePeriod());
+ }
+
@Override
public boolean isAutoTag(String tagName) {
try {
diff --git a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
index 43fbe2b6460a..0214f0d99e2b 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java
@@ -20,18 +20,28 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.TagManager;
import org.junit.jupiter.api.Test;
+import java.time.LocalDateTime;
+import java.util.Collections;
import java.util.List;
import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN;
+import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG;
import static org.apache.paimon.data.BinaryString.fromString;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
@@ -301,4 +311,88 @@ public void testAppendTableTag() throws Exception {
assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG2")))
.containsExactlyInAnyOrder(GenericRow.of(1, 1, 2));
}
+
+ @Test
+ public void testIncrementalToTagFirst() throws Exception {
+ Identifier identifier = identifier("T");
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.STRING())
+ .primaryKey("a")
+ .option("bucket", "1")
+ .build();
+ catalog.createTable(identifier, schema, false);
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+
+ write(table, GenericRow.of(1, BinaryString.fromString("a")));
+ write(table, GenericRow.of(2, BinaryString.fromString("b")));
+ write(table, GenericRow.of(3, BinaryString.fromString("c")));
+
+ table.createTag("1", 1);
+ table.createTag("3", 2);
+
+ assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN, "1,3")))
+ .containsExactlyInAnyOrder(GenericRow.of(2, BinaryString.fromString("b")));
+ }
+
+ @Test
+ public void testIncrementalToAutoTag() throws Exception {
+ Identifier identifier = identifier("T");
+ Schema schema =
+ Schema.newBuilder()
+ .column("a", DataTypes.INT())
+ .column("b", DataTypes.STRING())
+ .primaryKey("a")
+ .option("bucket", "1")
+ .option("tag.automatic-creation", "watermark")
+ .option("tag.creation-period", "daily")
+ .build();
+ catalog.createTable(identifier, schema, false);
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
+
+ TableWriteImpl> write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser).ignoreEmptyCommit(false);
+ TagManager tagManager = table.tagManager();
+
+ write.write(GenericRow.of(1, BinaryString.fromString("a")));
+ List commitMessages = write.prepareCommit(false, 0);
+ commit.commit(
+ new ManifestCommittable(
+ 0,
+ utcMills("2024-12-02T10:00:00"),
+ Collections.emptyMap(),
+ commitMessages));
+
+ write.write(GenericRow.of(2, BinaryString.fromString("b")));
+ commitMessages = write.prepareCommit(false, 1);
+ commit.commit(
+ new ManifestCommittable(
+ 1,
+ utcMills("2024-12-03T10:00:00"),
+ Collections.emptyMap(),
+ commitMessages));
+
+ write.write(GenericRow.of(3, BinaryString.fromString("c")));
+ commitMessages = write.prepareCommit(false, 2);
+ commit.commit(
+ new ManifestCommittable(
+ 2,
+ utcMills("2024-12-05T10:00:00"),
+ Collections.emptyMap(),
+ commitMessages));
+
+ assertThat(tagManager.allTagNames()).containsOnly("2024-12-01", "2024-12-02", "2024-12-04");
+
+ assertThat(read(table, Pair.of(INCREMENTAL_TO_AUTO_TAG, "2024-12-01"))).isEmpty();
+ assertThat(read(table, Pair.of(INCREMENTAL_TO_AUTO_TAG, "2024-12-02")))
+ .containsExactly(GenericRow.of(2, BinaryString.fromString("b")));
+ assertThat(read(table, Pair.of(INCREMENTAL_TO_AUTO_TAG, "2024-12-03"))).isEmpty();
+ assertThat(read(table, Pair.of(INCREMENTAL_TO_AUTO_TAG, "2024-12-04")))
+ .containsExactly(GenericRow.of(3, BinaryString.fromString("c")));
+ }
+
+ private static long utcMills(String timestamp) {
+ return Timestamp.fromLocalDateTime(LocalDateTime.parse(timestamp)).getMillisecond();
+ }
}