diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java index 55212b1c9a93..8c4b5d5cec01 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java @@ -22,20 +22,25 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.tag.Tag; import org.apache.paimon.tag.TagPeriodHandler; -import org.apache.paimon.tag.TagTimeExtractor; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.time.LocalDateTime; +import java.util.List; import java.util.Optional; +import java.util.stream.Collectors; import static org.apache.paimon.utils.Preconditions.checkArgument; -import static org.apache.paimon.utils.Preconditions.checkNotNull; -import static org.apache.paimon.utils.Preconditions.checkState; /** {@link StartingScanner} for incremental changes by tag. */ public class IncrementalTagStartingScanner extends AbstractStartingScanner { + private static final Logger LOG = LoggerFactory.getLogger(IncrementalTagStartingScanner.class); + private final Snapshot start; private final Snapshot end; @@ -54,11 +59,6 @@ public Result scan(SnapshotReader reader) { public static AbstractStartingScanner create( SnapshotManager snapshotManager, String endTagName, CoreOptions options) { - TagTimeExtractor extractor = TagTimeExtractor.createForAutoTag(options); - checkNotNull( - extractor, - "Table's tag creation mode doesn't support '%s' scan mode.", - CoreOptions.INCREMENTAL_TO_AUTO_TAG); TagPeriodHandler periodHandler = TagPeriodHandler.create(options); checkArgument( periodHandler.isAutoTag(endTagName), @@ -70,42 +70,27 @@ public static AbstractStartingScanner create( Optional endTag = tagManager.get(endTagName); if (!endTag.isPresent()) { + LOG.info("Tag {} doesn't exist.", endTagName); return new EmptyResultStartingScanner(snapshotManager); } - Snapshot end = endTag.get().trimToSnapshot(); - Snapshot earliestSnapshot = snapshotManager.earliestSnapshot(); - checkState(earliestSnapshot != null, "No tags can be found."); - - LocalDateTime earliestTime = - extractor - .extract(earliestSnapshot.timeMillis(), earliestSnapshot.watermark()) - .orElseThrow( - () -> - new RuntimeException( - "Cannot get valid tag time from the earliest snapshot.")); - LocalDateTime earliestTagTime = periodHandler.normalizeToPreviousTag(earliestTime); - LocalDateTime endTagTime = periodHandler.tagToTime(endTagName); - LocalDateTime previousTagTime = periodHandler.previousTagTime(endTagTime); - - Snapshot start = null; - while (previousTagTime.isAfter(earliestTagTime) - || previousTagTime.isEqual(earliestTagTime)) { - String previousTagName = periodHandler.timeToTag(previousTagTime); - Optional previousTag = tagManager.get(previousTagName); - if (previousTag.isPresent()) { - start = previousTag.get().trimToSnapshot(); - break; - } else { - previousTagTime = periodHandler.previousTagTime(previousTagTime); - } - } - if (start == null) { + List> previousTags = + tagManager.tagObjects().stream() + .filter(p -> periodHandler.isAutoTag(p.getRight())) + .map(p -> Pair.of(p.getLeft(), periodHandler.tagToTime(p.getRight()))) + .filter(p -> p.getRight().isBefore(endTagTime)) + .sorted((tag1, tag2) -> tag2.getRight().compareTo(tag1.getRight())) + .collect(Collectors.toList()); + + if (previousTags.isEmpty()) { + LOG.info("Didn't found earlier tags for {}.", endTagName); return new EmptyResultStartingScanner(snapshotManager); } + LOG.info("Found start tag {} .", periodHandler.timeToTag(previousTags.get(0).getRight())); + Snapshot start = previousTags.get(0).getLeft().trimToSnapshot(); return new IncrementalTagStartingScanner(snapshotManager, start, end); } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java index 0214f0d99e2b..416f375bdc58 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java @@ -25,6 +25,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.manifest.ManifestCommittable; +import org.apache.paimon.options.ExpireConfig; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.TableCommitImpl; @@ -384,6 +385,19 @@ public void testIncrementalToAutoTag() throws Exception { assertThat(tagManager.allTagNames()).containsOnly("2024-12-01", "2024-12-02", "2024-12-04"); + assertThat(read(table, Pair.of(INCREMENTAL_TO_AUTO_TAG, "2024-12-01"))).isEmpty(); + assertThat(read(table, Pair.of(INCREMENTAL_TO_AUTO_TAG, "2024-12-02"))) + .containsExactly(GenericRow.of(2, BinaryString.fromString("b"))); + assertThat(read(table, Pair.of(INCREMENTAL_TO_AUTO_TAG, "2024-12-03"))).isEmpty(); + assertThat(read(table, Pair.of(INCREMENTAL_TO_AUTO_TAG, "2024-12-04"))) + .containsExactly(GenericRow.of(3, BinaryString.fromString("c"))); + + // validate after snapshot expire + table.newExpireSnapshots() + .config(ExpireConfig.builder().snapshotRetainMax(1).snapshotRetainMin(1).build()) + .expire(); + assertThat(table.snapshotManager().snapshotCount()).isEqualTo(1); + assertThat(read(table, Pair.of(INCREMENTAL_TO_AUTO_TAG, "2024-12-01"))).isEmpty(); assertThat(read(table, Pair.of(INCREMENTAL_TO_AUTO_TAG, "2024-12-02"))) .containsExactly(GenericRow.of(2, BinaryString.fromString("b")));