From d8fb5e8d6a2d795ad387098bd6587ae2161f162b Mon Sep 17 00:00:00 2001 From: yuzelin Date: Mon, 6 Jan 2025 10:11:17 +0800 Subject: [PATCH 1/7] [core] Introduce incremental-to for reading changes between auto tags --- docs/content/flink/sql-query.md | 17 ++++ .../generated/core_configuration.html | 8 +- .../java/org/apache/paimon/CoreOptions.java | 22 ++++- .../paimon/schema/SchemaValidation.java | 30 +++++-- .../table/source/AbstractDataTableScan.java | 6 +- .../snapshot/EmptyResultStartingScanner.java | 34 +++++++ .../IncrementalTagStartingScanner.java | 90 +++++++++++++++++-- .../apache/paimon/tag/TagPeriodHandler.java | 7 ++ .../paimon/table/IncrementalTableTest.java | 70 +++++++++++++++ 9 files changed, 269 insertions(+), 15 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/EmptyResultStartingScanner.java diff --git a/docs/content/flink/sql-query.md b/docs/content/flink/sql-query.md index 89136b0b0635..b492d8c7ff1d 100644 --- a/docs/content/flink/sql-query.md +++ b/docs/content/flink/sql-query.md @@ -105,6 +105,23 @@ If you want see `DELETE` records, you can use audit_log table: SELECT * FROM t$audit_log /*+ OPTIONS('incremental-between' = '12,20') */; ``` +### Batch Incremental between Auto-created Tags + +You can use `incremental-between` to query incremental changes between two tags. But for auto-created tag, the tag may +not be created in-time because of data delay. + +For example, assume that tags '2024-12-01', '2024-12-02' and '2024-12-04' are auto created daily. Data for 12/03 are delayed +and ingested with data for 12/04. Now if you want to query the incremental changes between tags, and you don't know the tag +of 12/03 is not created, you will use `incremental-between` with '2024-12-01,2024-12-02', '2024-12-02,2024-12-03' and +'2024-12-03,2024-12-04' respectively, then you will get an error that the tag '2024-12-03' doesn't exist. + +We introduced a new option `incremental-to` for this scenario. You can only specify the end tag, and Paimon will find an earlier +tag and return changes between them. If the tag doesn't exist or the earlier tag doesn't exist, return empty. + +For example, when you query 'incremental-to=2024-12-01' or 'incremental-to=2024-12-03', the result is empty; Query +'incremental-to=2024-12-02', the result is change between 12/01 and 12/02; Query 'incremental-to=2024-12-04', the result +is change between 12/02 and 12/04. + ## Streaming Query By default, Streaming read produces the latest snapshot on the table upon first startup, diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index f60d0ec91009..9a58e40f7ff1 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -392,6 +392,12 @@ String Read incremental changes between start timestamp (exclusive) and end timestamp, for example, 't1,t2' means changes between timestamp t1 and timestamp t2. + +
incremental-to
+ (none) + String + Used for "incremental-to-auto-tag" to specify the auto-created tag to reading incremental changes. +
local-merge-buffer-size
(none) @@ -717,7 +723,7 @@
scan.mode
default

Enum

- Specify the scanning behavior of the source.

Possible values: + Specify the scanning behavior of the source.

Possible values:
scan.plan-sort-partition
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index efd886501266..16dc276c2775 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1080,6 +1080,13 @@ public class CoreOptions implements Serializable { "Read incremental changes between start timestamp (exclusive) and end timestamp, " + "for example, 't1,t2' means changes between timestamp t1 and timestamp t2."); + public static final ConfigOption INCREMENTAL_TO = + key("incremental-to") + .stringType() + .noDefaultValue() + .withDescription( + "Used for \"incremental-to-auto-tag\" to specify the auto-created tag to reading incremental changes."); + public static final ConfigOption END_INPUT_CHECK_PARTITION_EXPIRE = key("end-input.check-partition-expire") .booleanType() @@ -2119,6 +2126,10 @@ public IncrementalBetweenScanMode incrementalBetweenScanMode() { return options.get(INCREMENTAL_BETWEEN_SCAN_MODE); } + public String incrementalTo() { + return options.get(INCREMENTAL_TO); + } + public Integer scanManifestParallelism() { return options.get(SCAN_MANIFEST_PARALLELISM); } @@ -2470,7 +2481,12 @@ public enum StartupMode implements DescribedEnum { INCREMENTAL( "incremental", - "Read incremental changes between start and end snapshot or timestamp."); + "Read incremental changes between start and end snapshot or timestamp."), + + INCREMENTAL_TO_AUTO_TAG( + "incremental-to-auto-tag", + "Specify an auto-created tag, then try to find an earlier auto-created tag to read incremental changes. " + + "If specified tag is the first auto-created tag or doesn't exist, the result is empty."); private final String value; private final String description; @@ -2756,6 +2772,10 @@ public static void setDefaultValues(Options options) { && !options.contains(SCAN_MODE)) { options.set(SCAN_MODE, StartupMode.INCREMENTAL); } + + if (options.contains(INCREMENTAL_TO) && !options.contains(SCAN_MODE)) { + options.set(SCAN_MODE, StartupMode.INCREMENTAL_TO_AUTO_TAG); + } } public static List> getOptions() { diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 4bddcdd72d7e..76c4f826445f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -62,6 +62,7 @@ import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS; import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN; import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP; +import static org.apache.paimon.CoreOptions.INCREMENTAL_TO; import static org.apache.paimon.CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS; import static org.apache.paimon.CoreOptions.SCAN_MODE; import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID; @@ -278,7 +279,8 @@ private static void validateStartupMode(CoreOptions options) { SCAN_FILE_CREATION_TIME_MILLIS, SCAN_TAG_NAME, INCREMENTAL_BETWEEN_TIMESTAMP, - INCREMENTAL_BETWEEN), + INCREMENTAL_BETWEEN, + INCREMENTAL_TO), Arrays.asList(SCAN_TIMESTAMP_MILLIS, SCAN_TIMESTAMP)); } else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT) { checkExactOneOptionExistInMode( @@ -294,7 +296,8 @@ private static void validateStartupMode(CoreOptions options) { SCAN_TIMESTAMP, SCAN_FILE_CREATION_TIME_MILLIS, INCREMENTAL_BETWEEN_TIMESTAMP, - INCREMENTAL_BETWEEN), + INCREMENTAL_BETWEEN, + INCREMENTAL_TO), Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TAG_NAME)); } else if (options.startupMode() == CoreOptions.StartupMode.INCREMENTAL) { checkExactOneOptionExistInMode( @@ -309,8 +312,22 @@ private static void validateStartupMode(CoreOptions options) { SCAN_TIMESTAMP_MILLIS, SCAN_FILE_CREATION_TIME_MILLIS, SCAN_TIMESTAMP, - SCAN_TAG_NAME), + SCAN_TAG_NAME, + INCREMENTAL_TO), Arrays.asList(INCREMENTAL_BETWEEN, INCREMENTAL_BETWEEN_TIMESTAMP)); + } else if (options.startupMode() == CoreOptions.StartupMode.INCREMENTAL_TO_AUTO_TAG) { + checkExactOneOptionExistInMode(options, options.startupMode(), INCREMENTAL_TO); + checkOptionsConflict( + options, + Arrays.asList( + SCAN_SNAPSHOT_ID, + SCAN_TIMESTAMP_MILLIS, + SCAN_FILE_CREATION_TIME_MILLIS, + SCAN_TIMESTAMP, + SCAN_TAG_NAME, + INCREMENTAL_BETWEEN, + INCREMENTAL_BETWEEN_TIMESTAMP), + Collections.singletonList(INCREMENTAL_TO)); } else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) { checkOptionExistInMode(options, SCAN_SNAPSHOT_ID, options.startupMode()); checkOptionsConflict( @@ -321,7 +338,8 @@ private static void validateStartupMode(CoreOptions options) { SCAN_FILE_CREATION_TIME_MILLIS, SCAN_TAG_NAME, INCREMENTAL_BETWEEN_TIMESTAMP, - INCREMENTAL_BETWEEN), + INCREMENTAL_BETWEEN, + INCREMENTAL_TO), Collections.singletonList(SCAN_SNAPSHOT_ID)); } else if (options.startupMode() == CoreOptions.StartupMode.FROM_FILE_CREATION_TIME) { checkOptionExistInMode( @@ -335,7 +353,8 @@ private static void validateStartupMode(CoreOptions options) { SCAN_TIMESTAMP_MILLIS, SCAN_TAG_NAME, INCREMENTAL_BETWEEN_TIMESTAMP, - INCREMENTAL_BETWEEN), + INCREMENTAL_BETWEEN, + INCREMENTAL_TO), Collections.singletonList(SCAN_FILE_CREATION_TIME_MILLIS)); } else { checkOptionNotExistInMode(options, SCAN_TIMESTAMP_MILLIS, options.startupMode()); @@ -347,6 +366,7 @@ private static void validateStartupMode(CoreOptions options) { checkOptionNotExistInMode( options, INCREMENTAL_BETWEEN_TIMESTAMP, options.startupMode()); checkOptionNotExistInMode(options, INCREMENTAL_BETWEEN, options.startupMode()); + checkOptionNotExistInMode(options, INCREMENTAL_TO, options.startupMode()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index 24c6943f546f..fe265d809dcd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -228,7 +228,7 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { Long.parseLong(incrementalBetween.getRight()), scanMode); } catch (NumberFormatException e) { - return new IncrementalTagStartingScanner( + return IncrementalTagStartingScanner.create( snapshotManager, incrementalBetween.getLeft(), incrementalBetween.getRight()); @@ -240,6 +240,10 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { Long.parseLong(incrementalBetween.getRight()), scanMode); } + case INCREMENTAL_TO_AUTO_TAG: + checkArgument(!isStreaming, "Cannot read incremental in streaming mode."); + String endTag = options.incrementalTo(); + return IncrementalTagStartingScanner.create(snapshotManager, endTag, options); default: throw new UnsupportedOperationException( "Unknown startup mode " + startupMode.name()); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/EmptyResultStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/EmptyResultStartingScanner.java new file mode 100644 index 000000000000..fc38e272d517 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/EmptyResultStartingScanner.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source.snapshot; + +import org.apache.paimon.utils.SnapshotManager; + +/** This scanner always return an empty result. */ +public class EmptyResultStartingScanner extends AbstractStartingScanner { + + EmptyResultStartingScanner(SnapshotManager snapshotManager) { + super(snapshotManager); + } + + @Override + public Result scan(SnapshotReader snapshotReader) { + return new NoSnapshot(); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java index e08ac9f44c60..2926481672ac 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java @@ -18,10 +18,21 @@ package org.apache.paimon.table.source.snapshot; +import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; +import org.apache.paimon.tag.Tag; +import org.apache.paimon.tag.TagPeriodHandler; +import org.apache.paimon.tag.TagTimeExtractor; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; +import java.time.LocalDateTime; +import java.util.Optional; + +import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.Preconditions.checkNotNull; +import static org.apache.paimon.utils.Preconditions.checkState; + /** {@link StartingScanner} for incremental changes by tag. */ public class IncrementalTagStartingScanner extends AbstractStartingScanner { @@ -29,23 +40,88 @@ public class IncrementalTagStartingScanner extends AbstractStartingScanner { private final Snapshot end; public IncrementalTagStartingScanner( - SnapshotManager snapshotManager, String startTagName, String endTagName) { + SnapshotManager snapshotManager, Snapshot start, Snapshot end) { super(snapshotManager); + this.start = start; + this.end = end; + this.startingSnapshotId = start.id(); + } + + @Override + public Result scan(SnapshotReader reader) { + return StartingScanner.fromPlan(reader.withSnapshot(end).readIncrementalDiff(start)); + } + + public static IncrementalTagStartingScanner create( + SnapshotManager snapshotManager, String startTagName, String endTagName) { TagManager tagManager = new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); - start = tagManager.getOrThrow(startTagName).trimToSnapshot(); - end = tagManager.getOrThrow(endTagName).trimToSnapshot(); + Snapshot start = tagManager.getOrThrow(startTagName).trimToSnapshot(); + Snapshot end = tagManager.getOrThrow(endTagName).trimToSnapshot(); if (end.id() <= start.id()) { throw new IllegalArgumentException( String.format( "Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s", endTagName, end.id(), startTagName, start.id())); } - this.startingSnapshotId = start.id(); + return new IncrementalTagStartingScanner(snapshotManager, start, end); } - @Override - public Result scan(SnapshotReader reader) { - return StartingScanner.fromPlan(reader.withSnapshot(end).readIncrementalDiff(start)); + public static AbstractStartingScanner create( + SnapshotManager snapshotManager, String endTagName, CoreOptions options) { + TagTimeExtractor extractor = TagTimeExtractor.createForAutoTag(options); + checkNotNull( + extractor, + "Table's tag creation mode doesn't support '%s' scan mode.", + CoreOptions.StartupMode.INCREMENTAL_TO_AUTO_TAG); + TagPeriodHandler periodHandler = TagPeriodHandler.create(options); + checkArgument( + periodHandler.isAutoTag(endTagName), + "Specified tag '%s' is not an auto-created tag.", + endTagName); + + TagManager tagManager = + new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); + + Optional endTag = tagManager.get(endTagName); + if (!endTag.isPresent()) { + return new EmptyResultStartingScanner(snapshotManager); + } + + Snapshot end = endTag.get().trimToSnapshot(); + + Snapshot earliestSnapshot = snapshotManager.earliestSnapshot(); + checkState(earliestSnapshot != null, "No tags can be found."); + + LocalDateTime earliestTime = + extractor + .extract(earliestSnapshot.timeMillis(), earliestSnapshot.watermark()) + .orElseThrow( + () -> + new RuntimeException( + "Cannot get valid tag time from the earliest snapshot.")); + LocalDateTime earliestTagTime = periodHandler.normalizeToPreviousTag(earliestTime); + + LocalDateTime endTagTime = periodHandler.tagToTime(endTagName); + LocalDateTime previousTagTime = periodHandler.previousTagTime(endTagTime); + + Snapshot start = null; + while (previousTagTime.isAfter(earliestTagTime) + || previousTagTime.isEqual(earliestTagTime)) { + String previousTagName = periodHandler.timeToTag(previousTagTime); + Optional previousTag = tagManager.get(previousTagName); + if (previousTag.isPresent()) { + start = previousTag.get().trimToSnapshot(); + break; + } else { + previousTagTime = periodHandler.previousTagTime(previousTagTime); + } + } + + if (start == null) { + return new EmptyResultStartingScanner(snapshotManager); + } + + return new IncrementalTagStartingScanner(snapshotManager, start, end); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java index c0fbe718c8fa..4934e26b22f6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagPeriodHandler.java @@ -88,6 +88,8 @@ public interface TagPeriodHandler { LocalDateTime nextTagTime(LocalDateTime time); + LocalDateTime previousTagTime(LocalDateTime time); + boolean isAutoTag(String tagName); /** Base implementation of {@link TagPeriodHandler}. */ @@ -127,6 +129,11 @@ public LocalDateTime nextTagTime(LocalDateTime time) { return time.plus(onePeriod()); } + @Override + public LocalDateTime previousTagTime(LocalDateTime time) { + return time.minus(onePeriod()); + } + @Override public boolean isAutoTag(String tagName) { try { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java index 43fbe2b6460a..7fd9c00d34de 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java @@ -20,18 +20,28 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.data.Timestamp; +import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.sink.TableWriteImpl; import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.TagManager; import org.junit.jupiter.api.Test; +import java.time.LocalDateTime; +import java.util.Collections; import java.util.List; import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN; +import static org.apache.paimon.CoreOptions.INCREMENTAL_TO; import static org.apache.paimon.data.BinaryString.fromString; import static org.apache.paimon.io.DataFileTestUtils.row; import static org.assertj.core.api.Assertions.assertThat; @@ -301,4 +311,64 @@ public void testAppendTableTag() throws Exception { assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN, "TAG1,TAG2"))) .containsExactlyInAnyOrder(GenericRow.of(1, 1, 2)); } + + @Test + public void testIncrementalToAutoTag() throws Exception { + Identifier identifier = identifier("T"); + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .primaryKey("a") + .option("bucket", "1") + .option("tag.automatic-creation", "watermark") + .option("tag.creation-period", "daily") + .build(); + catalog.createTable(identifier, schema, false); + FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); + + TableWriteImpl write = table.newWrite(commitUser); + TableCommitImpl commit = table.newCommit(commitUser).ignoreEmptyCommit(false); + TagManager tagManager = table.tagManager(); + + write.write(GenericRow.of(1, BinaryString.fromString("a"))); + List commitMessages = write.prepareCommit(false, 0); + commit.commit( + new ManifestCommittable( + 0, + utcMills("2024-12-02T10:00:00"), + Collections.emptyMap(), + commitMessages)); + + write.write(GenericRow.of(2, BinaryString.fromString("b"))); + commitMessages = write.prepareCommit(false, 1); + commit.commit( + new ManifestCommittable( + 1, + utcMills("2024-12-03T10:00:00"), + Collections.emptyMap(), + commitMessages)); + + write.write(GenericRow.of(3, BinaryString.fromString("c"))); + commitMessages = write.prepareCommit(false, 2); + commit.commit( + new ManifestCommittable( + 2, + utcMills("2024-12-05T10:00:00"), + Collections.emptyMap(), + commitMessages)); + + assertThat(tagManager.allTagNames()).containsOnly("2024-12-01", "2024-12-02", "2024-12-04"); + + assertThat(read(table, Pair.of(INCREMENTAL_TO, "2024-12-01"))).isEmpty(); + assertThat(read(table, Pair.of(INCREMENTAL_TO, "2024-12-02"))) + .containsExactly(GenericRow.of(2, BinaryString.fromString("b"))); + assertThat(read(table, Pair.of(INCREMENTAL_TO, "2024-12-03"))).isEmpty(); + assertThat(read(table, Pair.of(INCREMENTAL_TO, "2024-12-04"))) + .containsExactly(GenericRow.of(3, BinaryString.fromString("c"))); + } + + private static long utcMills(String timestamp) { + return Timestamp.fromLocalDateTime(LocalDateTime.parse(timestamp)).getMillisecond(); + } } From 1ad38eb9345d19d88fa88f131b17d03a33ded04c Mon Sep 17 00:00:00 2001 From: yuzelin Date: Mon, 6 Jan 2025 11:11:22 +0800 Subject: [PATCH 2/7] fix --- .../test/java/org/apache/paimon/flink/FlinkCatalogTest.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java index 4b8cf7912192..5c99ba054430 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java @@ -984,9 +984,12 @@ private static Stream> optionProvider(boolean isStreaming) { options.put(SCAN_FILE_CREATION_TIME_MILLIS.key(), System.currentTimeMillis() + ""); } else if (mode == CoreOptions.StartupMode.INCREMENTAL) { options.put("incremental-between", "2,5"); + } else if (mode == CoreOptions.StartupMode.INCREMENTAL_TO_AUTO_TAG) { + options.put("incremental-to", "2024-12-01"); } - if (isStreaming && mode == CoreOptions.StartupMode.INCREMENTAL) { + if (isStreaming && mode == CoreOptions.StartupMode.INCREMENTAL + || mode == CoreOptions.StartupMode.INCREMENTAL_TO_AUTO_TAG) { continue; } allOptions.add(options); From ab1062889e8c7a2854e4866f0aa3aa60ab708ee7 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Mon, 6 Jan 2025 15:33:28 +0800 Subject: [PATCH 3/7] fix --- .../generated/core_configuration.html | 8 +---- .../java/org/apache/paimon/CoreOptions.java | 22 ++++--------- .../paimon/schema/SchemaValidation.java | 32 +++++++------------ .../table/source/AbstractDataTableScan.java | 19 +++++++---- .../IncrementalTagStartingScanner.java | 2 +- .../paimon/table/IncrementalTableTest.java | 10 +++--- .../apache/paimon/flink/FlinkCatalogTest.java | 5 +-- 7 files changed, 38 insertions(+), 60 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 9a58e40f7ff1..f60d0ec91009 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -392,12 +392,6 @@ String Read incremental changes between start timestamp (exclusive) and end timestamp, for example, 't1,t2' means changes between timestamp t1 and timestamp t2. - -
incremental-to
- (none) - String - Used for "incremental-to-auto-tag" to specify the auto-created tag to reading incremental changes. -
local-merge-buffer-size
(none) @@ -723,7 +717,7 @@
scan.mode
default

Enum

- Specify the scanning behavior of the source.

Possible values:
  • "default": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp", and if "scan.snapshot-id" or "scan.tag-name" is set the actual startup mode will be "from-snapshot". Otherwise the actual startup mode will be "latest-full".
  • "latest-full": For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes. For batch sources, just produce the latest snapshot but does not read new changes.
  • "full": Deprecated. Same as "latest-full".
  • "latest": For streaming sources, continuously reads latest changes without producing a snapshot at the beginning. For batch sources, behaves the same as the "latest-full" startup mode.
  • "compacted-full": For streaming sources, produces a snapshot after the latest compaction on the table upon first startup, and continue to read the latest changes. For batch sources, just produce a snapshot after the latest compaction but does not read new changes. Snapshots of full compaction are picked when scheduled full-compaction is enabled.
  • "from-timestamp": For streaming sources, continuously reads changes starting from timestamp specified by "scan.timestamp-millis", without producing a snapshot at the beginning. For batch sources, produces a snapshot at timestamp specified by "scan.timestamp-millis" but does not read new changes.
  • "from-file-creation-time": For streaming and batch sources, produces a snapshot and filters the data files by creation time. For streaming sources, upon first startup, and continue to read the latest changes.
  • "from-snapshot": For streaming sources, continuously reads changes starting from snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning. For batch sources, produces a snapshot specified by "scan.snapshot-id" or "scan.tag-name" but does not read new changes.
  • "from-snapshot-full": For streaming sources, produces from snapshot specified by "scan.snapshot-id" on the table upon first startup, and continuously reads changes. For batch sources, produces a snapshot specified by "scan.snapshot-id" but does not read new changes.
  • "incremental": Read incremental changes between start and end snapshot or timestamp.
  • "incremental-to-auto-tag": Specify an auto-created tag, then try to find an earlier auto-created tag to read incremental changes. If specified tag is the first auto-created tag or doesn't exist, the result is empty.
+ Specify the scanning behavior of the source.

Possible values:
  • "default": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp", and if "scan.snapshot-id" or "scan.tag-name" is set the actual startup mode will be "from-snapshot". Otherwise the actual startup mode will be "latest-full".
  • "latest-full": For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes. For batch sources, just produce the latest snapshot but does not read new changes.
  • "full": Deprecated. Same as "latest-full".
  • "latest": For streaming sources, continuously reads latest changes without producing a snapshot at the beginning. For batch sources, behaves the same as the "latest-full" startup mode.
  • "compacted-full": For streaming sources, produces a snapshot after the latest compaction on the table upon first startup, and continue to read the latest changes. For batch sources, just produce a snapshot after the latest compaction but does not read new changes. Snapshots of full compaction are picked when scheduled full-compaction is enabled.
  • "from-timestamp": For streaming sources, continuously reads changes starting from timestamp specified by "scan.timestamp-millis", without producing a snapshot at the beginning. For batch sources, produces a snapshot at timestamp specified by "scan.timestamp-millis" but does not read new changes.
  • "from-file-creation-time": For streaming and batch sources, produces a snapshot and filters the data files by creation time. For streaming sources, upon first startup, and continue to read the latest changes.
  • "from-snapshot": For streaming sources, continuously reads changes starting from snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning. For batch sources, produces a snapshot specified by "scan.snapshot-id" or "scan.tag-name" but does not read new changes.
  • "from-snapshot-full": For streaming sources, produces from snapshot specified by "scan.snapshot-id" on the table upon first startup, and continuously reads changes. For batch sources, produces a snapshot specified by "scan.snapshot-id" but does not read new changes.
  • "incremental": Read incremental changes between start and end snapshot or timestamp.
scan.plan-sort-partition
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 16dc276c2775..928d29354cbb 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1080,8 +1080,8 @@ public class CoreOptions implements Serializable { "Read incremental changes between start timestamp (exclusive) and end timestamp, " + "for example, 't1,t2' means changes between timestamp t1 and timestamp t2."); - public static final ConfigOption INCREMENTAL_TO = - key("incremental-to") + public static final ConfigOption INCREMENTAL_TO_AUTO_TAG = + key("incremental-to-auto-tag") .stringType() .noDefaultValue() .withDescription( @@ -2126,8 +2126,8 @@ public IncrementalBetweenScanMode incrementalBetweenScanMode() { return options.get(INCREMENTAL_BETWEEN_SCAN_MODE); } - public String incrementalTo() { - return options.get(INCREMENTAL_TO); + public String incrementalToAutoTag() { + return options.get(INCREMENTAL_TO_AUTO_TAG); } public Integer scanManifestParallelism() { @@ -2481,12 +2481,7 @@ public enum StartupMode implements DescribedEnum { INCREMENTAL( "incremental", - "Read incremental changes between start and end snapshot or timestamp."), - - INCREMENTAL_TO_AUTO_TAG( - "incremental-to-auto-tag", - "Specify an auto-created tag, then try to find an earlier auto-created tag to read incremental changes. " - + "If specified tag is the first auto-created tag or doesn't exist, the result is empty."); + "Read incremental changes between start and end snapshot or timestamp."); private final String value; private final String description; @@ -2768,14 +2763,11 @@ public static void setDefaultValues(Options options) { } if ((options.contains(INCREMENTAL_BETWEEN_TIMESTAMP) - || options.contains(INCREMENTAL_BETWEEN)) + || options.contains(INCREMENTAL_BETWEEN) + || options.contains(INCREMENTAL_TO_AUTO_TAG)) && !options.contains(SCAN_MODE)) { options.set(SCAN_MODE, StartupMode.INCREMENTAL); } - - if (options.contains(INCREMENTAL_TO) && !options.contains(SCAN_MODE)) { - options.set(SCAN_MODE, StartupMode.INCREMENTAL_TO_AUTO_TAG); - } } public static List> getOptions() { diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index 76c4f826445f..ae24fb09576c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -62,7 +62,7 @@ import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS; import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN; import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP; -import static org.apache.paimon.CoreOptions.INCREMENTAL_TO; +import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG; import static org.apache.paimon.CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS; import static org.apache.paimon.CoreOptions.SCAN_MODE; import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID; @@ -280,7 +280,7 @@ private static void validateStartupMode(CoreOptions options) { SCAN_TAG_NAME, INCREMENTAL_BETWEEN_TIMESTAMP, INCREMENTAL_BETWEEN, - INCREMENTAL_TO), + INCREMENTAL_TO_AUTO_TAG), Arrays.asList(SCAN_TIMESTAMP_MILLIS, SCAN_TIMESTAMP)); } else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT) { checkExactOneOptionExistInMode( @@ -297,14 +297,15 @@ private static void validateStartupMode(CoreOptions options) { SCAN_FILE_CREATION_TIME_MILLIS, INCREMENTAL_BETWEEN_TIMESTAMP, INCREMENTAL_BETWEEN, - INCREMENTAL_TO), + INCREMENTAL_TO_AUTO_TAG), Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TAG_NAME)); } else if (options.startupMode() == CoreOptions.StartupMode.INCREMENTAL) { checkExactOneOptionExistInMode( options, options.startupMode(), INCREMENTAL_BETWEEN, - INCREMENTAL_BETWEEN_TIMESTAMP); + INCREMENTAL_BETWEEN_TIMESTAMP, + INCREMENTAL_TO_AUTO_TAG); checkOptionsConflict( options, Arrays.asList( @@ -312,22 +313,11 @@ private static void validateStartupMode(CoreOptions options) { SCAN_TIMESTAMP_MILLIS, SCAN_FILE_CREATION_TIME_MILLIS, SCAN_TIMESTAMP, - SCAN_TAG_NAME, - INCREMENTAL_TO), - Arrays.asList(INCREMENTAL_BETWEEN, INCREMENTAL_BETWEEN_TIMESTAMP)); - } else if (options.startupMode() == CoreOptions.StartupMode.INCREMENTAL_TO_AUTO_TAG) { - checkExactOneOptionExistInMode(options, options.startupMode(), INCREMENTAL_TO); - checkOptionsConflict( - options, + SCAN_TAG_NAME), Arrays.asList( - SCAN_SNAPSHOT_ID, - SCAN_TIMESTAMP_MILLIS, - SCAN_FILE_CREATION_TIME_MILLIS, - SCAN_TIMESTAMP, - SCAN_TAG_NAME, INCREMENTAL_BETWEEN, - INCREMENTAL_BETWEEN_TIMESTAMP), - Collections.singletonList(INCREMENTAL_TO)); + INCREMENTAL_BETWEEN_TIMESTAMP, + INCREMENTAL_TO_AUTO_TAG)); } else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) { checkOptionExistInMode(options, SCAN_SNAPSHOT_ID, options.startupMode()); checkOptionsConflict( @@ -339,7 +329,7 @@ private static void validateStartupMode(CoreOptions options) { SCAN_TAG_NAME, INCREMENTAL_BETWEEN_TIMESTAMP, INCREMENTAL_BETWEEN, - INCREMENTAL_TO), + INCREMENTAL_TO_AUTO_TAG), Collections.singletonList(SCAN_SNAPSHOT_ID)); } else if (options.startupMode() == CoreOptions.StartupMode.FROM_FILE_CREATION_TIME) { checkOptionExistInMode( @@ -354,7 +344,7 @@ private static void validateStartupMode(CoreOptions options) { SCAN_TAG_NAME, INCREMENTAL_BETWEEN_TIMESTAMP, INCREMENTAL_BETWEEN, - INCREMENTAL_TO), + INCREMENTAL_TO_AUTO_TAG), Collections.singletonList(SCAN_FILE_CREATION_TIME_MILLIS)); } else { checkOptionNotExistInMode(options, SCAN_TIMESTAMP_MILLIS, options.startupMode()); @@ -366,7 +356,7 @@ private static void validateStartupMode(CoreOptions options) { checkOptionNotExistInMode( options, INCREMENTAL_BETWEEN_TIMESTAMP, options.startupMode()); checkOptionNotExistInMode(options, INCREMENTAL_BETWEEN, options.startupMode()); - checkOptionNotExistInMode(options, INCREMENTAL_TO, options.startupMode()); + checkOptionNotExistInMode(options, INCREMENTAL_TO_AUTO_TAG, options.startupMode()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index fe265d809dcd..9710a3a1f26c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -26,6 +26,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.metrics.MetricRegistry; import org.apache.paimon.operation.FileStoreScan; +import org.apache.paimon.options.Options; import org.apache.paimon.table.source.snapshot.CompactedStartingScanner; import org.apache.paimon.table.source.snapshot.ContinuousCompactorStartingScanner; import org.apache.paimon.table.source.snapshot.ContinuousFromSnapshotFullStartingScanner; @@ -199,7 +200,6 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { : new StaticFromSnapshotStartingScanner(snapshotManager, scanSnapshotId); case INCREMENTAL: checkArgument(!isStreaming, "Cannot read incremental in streaming mode."); - Pair incrementalBetween = options.incrementalBetween(); CoreOptions.IncrementalBetweenScanMode scanType = options.incrementalBetweenScanMode(); ScanMode scanMode; @@ -220,7 +220,10 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { throw new UnsupportedOperationException( "Unknown incremental scan type " + scanType.name()); } - if (options.toMap().get(CoreOptions.INCREMENTAL_BETWEEN.key()) != null) { + + Options conf = options.toConfiguration(); + if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN)) { + Pair incrementalBetween = options.incrementalBetween(); try { return new IncrementalStartingScanner( snapshotManager, @@ -233,17 +236,19 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { incrementalBetween.getLeft(), incrementalBetween.getRight()); } - } else { + } else if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)) { + Pair incrementalBetween = options.incrementalBetween(); return new IncrementalTimeStampStartingScanner( snapshotManager, Long.parseLong(incrementalBetween.getLeft()), Long.parseLong(incrementalBetween.getRight()), scanMode); + } else if (conf.contains(CoreOptions.INCREMENTAL_TO_AUTO_TAG)) { + String endTag = options.incrementalToAutoTag(); + return IncrementalTagStartingScanner.create(snapshotManager, endTag, options); + } else { + throw new UnsupportedOperationException("Unknown incremental read mode."); } - case INCREMENTAL_TO_AUTO_TAG: - checkArgument(!isStreaming, "Cannot read incremental in streaming mode."); - String endTag = options.incrementalTo(); - return IncrementalTagStartingScanner.create(snapshotManager, endTag, options); default: throw new UnsupportedOperationException( "Unknown startup mode " + startupMode.name()); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java index 2926481672ac..37114db11688 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java @@ -73,7 +73,7 @@ public static AbstractStartingScanner create( checkNotNull( extractor, "Table's tag creation mode doesn't support '%s' scan mode.", - CoreOptions.StartupMode.INCREMENTAL_TO_AUTO_TAG); + CoreOptions.INCREMENTAL_TO_AUTO_TAG); TagPeriodHandler periodHandler = TagPeriodHandler.create(options); checkArgument( periodHandler.isAutoTag(endTagName), diff --git a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java index 7fd9c00d34de..cc595189d0fe 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java @@ -41,7 +41,7 @@ import java.util.List; import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN; -import static org.apache.paimon.CoreOptions.INCREMENTAL_TO; +import static org.apache.paimon.CoreOptions.INCREMENTAL_TO_AUTO_TAG; import static org.apache.paimon.data.BinaryString.fromString; import static org.apache.paimon.io.DataFileTestUtils.row; import static org.assertj.core.api.Assertions.assertThat; @@ -360,11 +360,11 @@ public void testIncrementalToAutoTag() throws Exception { assertThat(tagManager.allTagNames()).containsOnly("2024-12-01", "2024-12-02", "2024-12-04"); - assertThat(read(table, Pair.of(INCREMENTAL_TO, "2024-12-01"))).isEmpty(); - assertThat(read(table, Pair.of(INCREMENTAL_TO, "2024-12-02"))) + assertThat(read(table, Pair.of(INCREMENTAL_TO_AUTO_TAG, "2024-12-01"))).isEmpty(); + assertThat(read(table, Pair.of(INCREMENTAL_TO_AUTO_TAG, "2024-12-02"))) .containsExactly(GenericRow.of(2, BinaryString.fromString("b"))); - assertThat(read(table, Pair.of(INCREMENTAL_TO, "2024-12-03"))).isEmpty(); - assertThat(read(table, Pair.of(INCREMENTAL_TO, "2024-12-04"))) + assertThat(read(table, Pair.of(INCREMENTAL_TO_AUTO_TAG, "2024-12-03"))).isEmpty(); + assertThat(read(table, Pair.of(INCREMENTAL_TO_AUTO_TAG, "2024-12-04"))) .containsExactly(GenericRow.of(3, BinaryString.fromString("c"))); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java index 5c99ba054430..4b8cf7912192 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/FlinkCatalogTest.java @@ -984,12 +984,9 @@ private static Stream> optionProvider(boolean isStreaming) { options.put(SCAN_FILE_CREATION_TIME_MILLIS.key(), System.currentTimeMillis() + ""); } else if (mode == CoreOptions.StartupMode.INCREMENTAL) { options.put("incremental-between", "2,5"); - } else if (mode == CoreOptions.StartupMode.INCREMENTAL_TO_AUTO_TAG) { - options.put("incremental-to", "2024-12-01"); } - if (isStreaming && mode == CoreOptions.StartupMode.INCREMENTAL - || mode == CoreOptions.StartupMode.INCREMENTAL_TO_AUTO_TAG) { + if (isStreaming && mode == CoreOptions.StartupMode.INCREMENTAL) { continue; } allOptions.add(options); From bd36046b4041d0d1b54365869d2e35b3b57f5de1 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Mon, 6 Jan 2025 20:33:32 +0800 Subject: [PATCH 4/7] tag first --- .../table/source/AbstractDataTableScan.java | 136 +++++++++++------- .../IncrementalTagStartingScanner.java | 15 -- .../paimon/table/IncrementalTableTest.java | 24 ++++ 3 files changed, 111 insertions(+), 64 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index 9710a3a1f26c..a5810bfc24b2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.CoreOptions.ChangelogProducer; +import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.consumer.Consumer; import org.apache.paimon.consumer.ConsumerManager; @@ -45,21 +46,29 @@ import org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner; import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner; import org.apache.paimon.table.source.snapshot.StaticFromWatermarkStartingScanner; +import org.apache.paimon.tag.Tag; import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; import java.util.Map; import java.util.Optional; import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS; +import static org.apache.paimon.CoreOptions.INCREMENTAL_BETWEEN; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.Preconditions.checkNotNull; /** An abstraction layer above {@link FileStoreScan} to provide input split generation. */ public abstract class AbstractDataTableScan implements DataTableScan { + private static final Logger LOG = LoggerFactory.getLogger(AbstractDataTableScan.class); + private final CoreOptions options; protected final SnapshotReader snapshotReader; @@ -200,58 +209,87 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { : new StaticFromSnapshotStartingScanner(snapshotManager, scanSnapshotId); case INCREMENTAL: checkArgument(!isStreaming, "Cannot read incremental in streaming mode."); - CoreOptions.IncrementalBetweenScanMode scanType = - options.incrementalBetweenScanMode(); - ScanMode scanMode; - switch (scanType) { - case AUTO: - scanMode = - options.changelogProducer() == ChangelogProducer.NONE - ? ScanMode.DELTA - : ScanMode.CHANGELOG; - break; - case DELTA: - scanMode = ScanMode.DELTA; - break; - case CHANGELOG: - scanMode = ScanMode.CHANGELOG; - break; - default: - throw new UnsupportedOperationException( - "Unknown incremental scan type " + scanType.name()); - } - - Options conf = options.toConfiguration(); - if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN)) { - Pair incrementalBetween = options.incrementalBetween(); - try { - return new IncrementalStartingScanner( - snapshotManager, - Long.parseLong(incrementalBetween.getLeft()), - Long.parseLong(incrementalBetween.getRight()), - scanMode); - } catch (NumberFormatException e) { - return IncrementalTagStartingScanner.create( - snapshotManager, - incrementalBetween.getLeft(), - incrementalBetween.getRight()); - } - } else if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)) { - Pair incrementalBetween = options.incrementalBetween(); - return new IncrementalTimeStampStartingScanner( - snapshotManager, - Long.parseLong(incrementalBetween.getLeft()), - Long.parseLong(incrementalBetween.getRight()), - scanMode); - } else if (conf.contains(CoreOptions.INCREMENTAL_TO_AUTO_TAG)) { - String endTag = options.incrementalToAutoTag(); - return IncrementalTagStartingScanner.create(snapshotManager, endTag, options); - } else { - throw new UnsupportedOperationException("Unknown incremental read mode."); - } + return createIncrementalStartingScanner(snapshotManager); default: throw new UnsupportedOperationException( "Unknown startup mode " + startupMode.name()); } } + + private StartingScanner createIncrementalStartingScanner(SnapshotManager snapshotManager) { + CoreOptions.IncrementalBetweenScanMode scanType = options.incrementalBetweenScanMode(); + ScanMode scanMode; + switch (scanType) { + case AUTO: + scanMode = + options.changelogProducer() == ChangelogProducer.NONE + ? ScanMode.DELTA + : ScanMode.CHANGELOG; + break; + case DELTA: + scanMode = ScanMode.DELTA; + break; + case CHANGELOG: + scanMode = ScanMode.CHANGELOG; + break; + default: + throw new UnsupportedOperationException( + "Unknown incremental scan type " + scanType.name()); + } + + Options conf = options.toConfiguration(); + TagManager tagManager = + new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); + if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN)) { + Pair incrementalBetween = options.incrementalBetween(); + Optional startTag = tagManager.get(incrementalBetween.getLeft()); + Optional endTag = tagManager.get(incrementalBetween.getRight()); + if (startTag.isPresent() && endTag.isPresent()) { + Snapshot start = startTag.get().trimToSnapshot(); + Snapshot end = endTag.get().trimToSnapshot(); + + LOG.info( + "{} start and end are parsed to tag with snapshot id {} to {}.", + INCREMENTAL_BETWEEN.key(), + start.id(), + end.id()); + + if (end.id() <= start.id()) { + throw new IllegalArgumentException( + String.format( + "Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s", + incrementalBetween.getRight(), + end.id(), + incrementalBetween.getLeft(), + start.id())); + } + return new IncrementalTagStartingScanner(snapshotManager, start, end); + } else { + long startId, endId; + try { + startId = Long.parseLong(incrementalBetween.getLeft()); + endId = Long.parseLong(incrementalBetween.getRight()); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + String.format( + "Didn't find two tags for start '%s' and end '%s', and they are not two snapshot Ids. " + + "Please set two tags or two snapshot Ids.", + incrementalBetween.getLeft(), incrementalBetween.getRight())); + } + return new IncrementalStartingScanner(snapshotManager, startId, endId, scanMode); + } + } else if (conf.contains(CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP)) { + Pair incrementalBetween = options.incrementalBetween(); + return new IncrementalTimeStampStartingScanner( + snapshotManager, + Long.parseLong(incrementalBetween.getLeft()), + Long.parseLong(incrementalBetween.getRight()), + scanMode); + } else if (conf.contains(CoreOptions.INCREMENTAL_TO_AUTO_TAG)) { + String endTag = options.incrementalToAutoTag(); + return IncrementalTagStartingScanner.create(snapshotManager, endTag, options); + } else { + throw new UnsupportedOperationException("Unknown incremental read mode."); + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java index 37114db11688..55212b1c9a93 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java @@ -52,21 +52,6 @@ public Result scan(SnapshotReader reader) { return StartingScanner.fromPlan(reader.withSnapshot(end).readIncrementalDiff(start)); } - public static IncrementalTagStartingScanner create( - SnapshotManager snapshotManager, String startTagName, String endTagName) { - TagManager tagManager = - new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); - Snapshot start = tagManager.getOrThrow(startTagName).trimToSnapshot(); - Snapshot end = tagManager.getOrThrow(endTagName).trimToSnapshot(); - if (end.id() <= start.id()) { - throw new IllegalArgumentException( - String.format( - "Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s", - endTagName, end.id(), startTagName, start.id())); - } - return new IncrementalTagStartingScanner(snapshotManager, start, end); - } - public static AbstractStartingScanner create( SnapshotManager snapshotManager, String endTagName, CoreOptions options) { TagTimeExtractor extractor = TagTimeExtractor.createForAutoTag(options); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java index cc595189d0fe..0214f0d99e2b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/IncrementalTableTest.java @@ -312,6 +312,30 @@ public void testAppendTableTag() throws Exception { .containsExactlyInAnyOrder(GenericRow.of(1, 1, 2)); } + @Test + public void testIncrementalToTagFirst() throws Exception { + Identifier identifier = identifier("T"); + Schema schema = + Schema.newBuilder() + .column("a", DataTypes.INT()) + .column("b", DataTypes.STRING()) + .primaryKey("a") + .option("bucket", "1") + .build(); + catalog.createTable(identifier, schema, false); + FileStoreTable table = (FileStoreTable) catalog.getTable(identifier); + + write(table, GenericRow.of(1, BinaryString.fromString("a"))); + write(table, GenericRow.of(2, BinaryString.fromString("b"))); + write(table, GenericRow.of(3, BinaryString.fromString("c"))); + + table.createTag("1", 1); + table.createTag("3", 2); + + assertThat(read(table, Pair.of(INCREMENTAL_BETWEEN, "1,3"))) + .containsExactlyInAnyOrder(GenericRow.of(2, BinaryString.fromString("b"))); + } + @Test public void testIncrementalToAutoTag() throws Exception { Identifier identifier = identifier("T"); From 6ce3d3951f9dc534e75eddfac88963978e09d8d7 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Mon, 6 Jan 2025 20:35:02 +0800 Subject: [PATCH 5/7] fix --- docs/content/flink/sql-query.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/content/flink/sql-query.md b/docs/content/flink/sql-query.md index b492d8c7ff1d..6cba9f930f5a 100644 --- a/docs/content/flink/sql-query.md +++ b/docs/content/flink/sql-query.md @@ -115,12 +115,12 @@ and ingested with data for 12/04. Now if you want to query the incremental chang of 12/03 is not created, you will use `incremental-between` with '2024-12-01,2024-12-02', '2024-12-02,2024-12-03' and '2024-12-03,2024-12-04' respectively, then you will get an error that the tag '2024-12-03' doesn't exist. -We introduced a new option `incremental-to` for this scenario. You can only specify the end tag, and Paimon will find an earlier -tag and return changes between them. If the tag doesn't exist or the earlier tag doesn't exist, return empty. +We introduced a new option `incremental-to-auto-tag` for this scenario. You can only specify the end tag, and Paimon will +find an earlier tag and return changes between them. If the tag doesn't exist or the earlier tag doesn't exist, return empty. -For example, when you query 'incremental-to=2024-12-01' or 'incremental-to=2024-12-03', the result is empty; Query -'incremental-to=2024-12-02', the result is change between 12/01 and 12/02; Query 'incremental-to=2024-12-04', the result -is change between 12/02 and 12/04. +For example, when you query 'incremental-to-auto-tag=2024-12-01' or 'incremental-to-auto-tag=2024-12-03', the result is +empty; Query 'incremental-to-auto-tag=2024-12-02', the result is change between 12/01 and 12/02; Query 'incremental-to-auto-tag=2024-12-04', +the result is change between 12/02 and 12/04. ## Streaming Query From 4b88325745d5e01e15c8646216981612e696c954 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Mon, 6 Jan 2025 20:37:51 +0800 Subject: [PATCH 6/7] fix --- docs/layouts/shortcodes/generated/core_configuration.html | 6 ++++++ .../src/main/java/org/apache/paimon/CoreOptions.java | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index f60d0ec91009..0d8fe94960f8 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -392,6 +392,12 @@ String Read incremental changes between start timestamp (exclusive) and end timestamp, for example, 't1,t2' means changes between timestamp t1 and timestamp t2. + +
incremental-to-auto-tag
+ (none) + String + Used for "incremental-to-auto-tag" to specify the auto-created tag to reading incremental changes. +
local-merge-buffer-size
(none) diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index 928d29354cbb..005199acd862 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1085,7 +1085,7 @@ public class CoreOptions implements Serializable { .stringType() .noDefaultValue() .withDescription( - "Used for \"incremental-to-auto-tag\" to specify the auto-created tag to reading incremental changes."); + "Used to specify the auto-created tag to reading incremental changes."); public static final ConfigOption END_INPUT_CHECK_PARTITION_EXPIRE = key("end-input.check-partition-expire") From 3a63c0f7b480609a64bc9cf84118d2a53a19e64c Mon Sep 17 00:00:00 2001 From: yuzelin Date: Mon, 6 Jan 2025 20:57:28 +0800 Subject: [PATCH 7/7] fix --- docs/layouts/shortcodes/generated/core_configuration.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 0d8fe94960f8..2a5d7c924c3c 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -396,7 +396,7 @@
incremental-to-auto-tag
(none) String - Used for "incremental-to-auto-tag" to specify the auto-created tag to reading incremental changes. + Used to specify the auto-created tag to reading incremental changes.
local-merge-buffer-size