diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 074f7c0e29e4..53c514d36fd7 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -348,7 +348,7 @@
scan.mode
default

Enum

- Specify the scanning behavior of the source.

Possible values: + Specify the scanning behavior of the source.

Possible values:
scan.plan-sort-partition
@@ -362,6 +362,12 @@ Long Optional snapshot id used in case of "from-snapshot" or "from-snapshot-full" scan mode + +
scan.tag-name
+ (none) + String + Optional tag name used in case of "from-tag" scan mode. +
scan.timestamp-millis
(none) diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java index 1995aa5dec81..c08d6e9baca6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java @@ -401,6 +401,12 @@ public class CoreOptions implements Serializable { .withDescription( "Optional snapshot id used in case of \"from-snapshot\" or \"from-snapshot-full\" scan mode"); + public static final ConfigOption SCAN_TAG_NAME = + key("scan.tag-name") + .stringType() + .noDefaultValue() + .withDescription("Optional tag name used in case of \"from-tag\" scan mode."); + public static final ConfigOption SCAN_BOUNDED_WATERMARK = key("scan.bounded.watermark") .longType() @@ -887,7 +893,8 @@ public static StartupMode startupMode(Options options) { if (mode == StartupMode.DEFAULT) { if (options.getOptional(SCAN_TIMESTAMP_MILLIS).isPresent()) { return StartupMode.FROM_TIMESTAMP; - } else if (options.getOptional(SCAN_SNAPSHOT_ID).isPresent()) { + } else if (options.getOptional(SCAN_SNAPSHOT_ID).isPresent() + || options.getOptional(SCAN_TAG_NAME).isPresent()) { return StartupMode.FROM_SNAPSHOT; } else { return StartupMode.LATEST_FULL; @@ -911,6 +918,10 @@ public Long scanSnapshotId() { return options.get(SCAN_SNAPSHOT_ID); } + public String scanTagName() { + return options.get(SCAN_TAG_NAME); + } + public Integer scanManifestParallelism() { return options.get(SCAN_MANIFEST_PARALLELISM); } @@ -1008,7 +1019,7 @@ public enum StartupMode implements DescribedEnum { "default", "Determines actual startup mode according to other table properties. " + "If \"scan.timestamp-millis\" is set the actual startup mode will be \"from-timestamp\", " - + "and if \"scan.snapshot-id\" is set the actual startup mode will be \"from-snapshot\". " + + "and if \"scan.snapshot-id\" or \"scan.tag-name\" is set the actual startup mode will be \"from-snapshot\". " + "Otherwise the actual startup mode will be \"latest-full\"."), LATEST_FULL( @@ -1043,10 +1054,10 @@ public enum StartupMode implements DescribedEnum { FROM_SNAPSHOT( "from-snapshot", - "For streaming sources, continuously reads changes " - + "starting from snapshot specified by \"scan.snapshot-id\", " - + "without producing a snapshot at the beginning. For batch sources, " - + "produces a snapshot specified by \"scan.snapshot-id\" but does not read new changes."), + "For streaming sources, continuously reads changes starting from snapshot " + + "specified by \"scan.snapshot-id\", without producing a snapshot at the beginning. " + + "For batch sources, produces a snapshot specified by \"scan.snapshot-id\" " + + "or \"scan.tag-name\" but does not read new changes."), FROM_SNAPSHOT_FULL( "from-snapshot-full", diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 4aa325487056..836e386caeaf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -51,6 +51,7 @@ import java.util.stream.Collectors; import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.Preconditions.checkState; /** Default implementation of {@link FileStoreScan}. */ public abstract class AbstractFileStoreScan implements FileStoreScan { @@ -68,7 +69,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { protected final ScanBucketFilter bucketFilter; private Predicate partitionFilter; - private Long specifiedSnapshotId = null; + private Snapshot specifiedSnapshot = null; private Integer specifiedBucket = null; private List specifiedManifests = null; private ScanKind scanKind = ScanKind.ALL; @@ -142,19 +143,22 @@ public FileStoreScan withPartitionBucket(BinaryRow partition, int bucket) { @Override public FileStoreScan withSnapshot(long snapshotId) { - this.specifiedSnapshotId = snapshotId; - if (specifiedManifests != null) { - throw new IllegalStateException("Cannot set both snapshot id and manifests."); - } + checkState(specifiedManifests == null, "Cannot set both snapshot and manifests."); + this.specifiedSnapshot = snapshotManager.snapshot(snapshotId); + return this; + } + + @Override + public FileStoreScan withSnapshot(Snapshot snapshot) { + checkState(specifiedManifests == null, "Cannot set both snapshot and manifests."); + this.specifiedSnapshot = snapshot; return this; } @Override public FileStoreScan withManifestList(List manifests) { + checkState(specifiedSnapshot == null, "Cannot set both snapshot and manifests."); this.specifiedManifests = manifests; - if (specifiedSnapshotId != null) { - throw new IllegalStateException("Cannot set both snapshot id and manifests."); - } return this; } @@ -209,20 +213,17 @@ private Pair> doPlan( List manifests = specifiedManifests; Snapshot snapshot = null; if (manifests == null) { - Long snapshotId = specifiedSnapshotId; - if (snapshotId == null) { - snapshotId = snapshotManager.latestSnapshotId(); - } - if (snapshotId == null) { + snapshot = + specifiedSnapshot == null + ? snapshotManager.latestSnapshot() + : specifiedSnapshot; + if (snapshot == null) { manifests = Collections.emptyList(); } else { - snapshot = snapshotManager.snapshot(snapshotId); manifests = readManifests(snapshot); } } - final List readManifests = manifests; - Iterable entries = ParallellyExecuteUtils.parallelismBatchIterable( files -> @@ -231,7 +232,7 @@ private Pair> doPlan( .flatMap(m -> readManifest.apply(m).stream()) .filter(this::filterByStats) .collect(Collectors.toList()), - readManifests, + manifests, scanManifestParallelism); List files = new ArrayList<>(); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java index 0f1697a26ce8..457d9a0bf580 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java @@ -18,6 +18,7 @@ package org.apache.paimon.operation; +import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.FileKind; @@ -48,6 +49,8 @@ public interface FileStoreScan { FileStoreScan withSnapshot(long snapshotId); + FileStoreScan withSnapshot(Snapshot snapshot); + FileStoreScan withManifestList(List manifests); FileStoreScan withKind(ScanKind scanKind); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index ee7b1a08fcb5..a657019b498e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -31,16 +31,19 @@ import org.apache.paimon.types.RowType; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.BUCKET_KEY; import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER; import static org.apache.paimon.CoreOptions.SCAN_MODE; import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID; +import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME; import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS; import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX; import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN; @@ -68,18 +71,8 @@ public static void validateTableSchema(TableSchema schema) { validatePrimaryKeysType(schema.fields(), schema.primaryKeys()); CoreOptions options = new CoreOptions(schema.options()); - if (options.startupMode() == CoreOptions.StartupMode.FROM_TIMESTAMP) { - checkOptionExistInMode( - options, SCAN_TIMESTAMP_MILLIS, CoreOptions.StartupMode.FROM_TIMESTAMP); - checkOptionsConflict(options, SCAN_SNAPSHOT_ID, SCAN_TIMESTAMP_MILLIS); - } else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT - || options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) { - checkOptionExistInMode(options, SCAN_SNAPSHOT_ID, options.startupMode()); - checkOptionsConflict(options, SCAN_TIMESTAMP_MILLIS, SCAN_SNAPSHOT_ID); - } else { - checkOptionNotExistInMode(options, SCAN_TIMESTAMP_MILLIS, options.startupMode()); - checkOptionNotExistInMode(options, SCAN_SNAPSHOT_ID, options.startupMode()); - } + + validateStartupMode(options); if (options.writeMode() == WriteMode.APPEND_ONLY && options.changelogProducer() != CoreOptions.ChangelogProducer.NONE) { @@ -191,6 +184,34 @@ private static void validatePrimaryKeysType(List fields, List } } + private static void validateStartupMode(CoreOptions options) { + if (options.startupMode() == CoreOptions.StartupMode.FROM_TIMESTAMP) { + checkOptionExistInMode( + options, SCAN_TIMESTAMP_MILLIS, CoreOptions.StartupMode.FROM_TIMESTAMP); + checkOptionsConflict( + options, + Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TAG_NAME), + Collections.singletonList(SCAN_TIMESTAMP_MILLIS)); + } else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT) { + checkExactOneOptionExistInMode( + options, options.startupMode(), SCAN_SNAPSHOT_ID, SCAN_TAG_NAME); + checkOptionsConflict( + options, + Collections.singletonList(SCAN_TIMESTAMP_MILLIS), + Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TAG_NAME)); + } else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) { + checkOptionExistInMode(options, SCAN_SNAPSHOT_ID, options.startupMode()); + checkOptionsConflict( + options, + Arrays.asList(SCAN_TIMESTAMP_MILLIS, SCAN_TAG_NAME), + Collections.singletonList(SCAN_SNAPSHOT_ID)); + } else { + checkOptionNotExistInMode(options, SCAN_TIMESTAMP_MILLIS, options.startupMode()); + checkOptionNotExistInMode(options, SCAN_SNAPSHOT_ID, options.startupMode()); + checkOptionNotExistInMode(options, SCAN_TAG_NAME, options.startupMode()); + } + } + private static void checkOptionExistInMode( CoreOptions options, ConfigOption option, CoreOptions.StartupMode startupMode) { checkArgument( @@ -209,11 +230,34 @@ private static void checkOptionNotExistInMode( option.key(), startupMode, SCAN_MODE.key())); } - private static void checkOptionsConflict( - CoreOptions options, ConfigOption illegalOption, ConfigOption legalOption) { + private static void checkExactOneOptionExistInMode( + CoreOptions options, + CoreOptions.StartupMode startupMode, + ConfigOption... configOptions) { checkArgument( - !options.toConfiguration().contains(illegalOption), + Arrays.stream(configOptions) + .filter(op -> options.toConfiguration().contains(op)) + .count() + == 1, String.format( - "%s must be null when you set %s", illegalOption.key(), legalOption.key())); + "must set only one key in [%s] when you use %s for %s", + concatConfigKeys(Arrays.asList(configOptions)), + startupMode, + SCAN_MODE.key())); + } + + private static void checkOptionsConflict( + CoreOptions options, + List> illegalOptions, + List> legalOptions) { + checkArgument( + illegalOptions.stream().noneMatch(op -> options.toConfiguration().contains(op)), + "[%s] must be null when you set [%s]", + concatConfigKeys(illegalOptions), + concatConfigKeys(legalOptions)); + } + + private static String concatConfigKeys(List> configOptions) { + return configOptions.stream().map(ConfigOption::key).collect(Collectors.joining(",")); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 50f05bacf08b..78eceeb98c8c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -239,15 +239,23 @@ public TableCommitImpl newCommit(String commitUser) { private Optional tryTimeTravel(Options options) { CoreOptions coreOptions = new CoreOptions(options); - Long snapshotId; switch (coreOptions.startupMode()) { case FROM_SNAPSHOT: case FROM_SNAPSHOT_FULL: - snapshotId = coreOptions.scanSnapshotId(); - if (snapshotManager().snapshotExists(snapshotId)) { - long schemaId = snapshotManager().snapshot(snapshotId).schemaId(); - return Optional.of(schemaManager().schema(schemaId).copy(options.toMap())); + if (coreOptions.scanSnapshotId() != null) { + long snapshotId = coreOptions.scanSnapshotId(); + if (snapshotManager().snapshotExists(snapshotId)) { + long schemaId = snapshotManager().snapshot(snapshotId).schemaId(); + return Optional.of(schemaManager().schema(schemaId).copy(options.toMap())); + } + } else { + String tagName = coreOptions.scanTagName(); + TagManager tagManager = new TagManager(fileIO, path); + if (tagManager.tagExists(tagName)) { + long schemaId = tagManager.taggedSnapshot(tagName).schemaId(); + return Optional.of(schemaManager().schema(schemaId).copy(options.toMap())); + } } return Optional.empty(); case FROM_TIMESTAMP: diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java index f5943987c93a..2ff81d55d987 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java @@ -35,13 +35,14 @@ import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.table.source.snapshot.StartingScanner; import org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner; +import org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner; import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner; -import org.apache.paimon.utils.Preconditions; import java.util.List; import java.util.Optional; import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** An abstraction layer above {@link FileStoreScan} to provide input split generation. */ public abstract class AbstractInnerTableScan implements InnerTableScan { @@ -70,7 +71,7 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { switch (type) { case NORMAL: { - Preconditions.checkArgument( + checkArgument( isStreaming, "Set 'streaming-compact' in batch mode. This is unexpected."); return new ContinuousCompactorStartingScanner(); @@ -112,29 +113,20 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { } case FROM_TIMESTAMP: Long startupMillis = options.scanTimestampMills(); - Preconditions.checkNotNull( - startupMillis, - String.format( - "%s can not be null when you use %s for %s", - CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), - CoreOptions.StartupMode.FROM_TIMESTAMP, - CoreOptions.SCAN_MODE.key())); return isStreaming ? new ContinuousFromTimestampStartingScanner(startupMillis) : new StaticFromTimestampStartingScanner(startupMillis); case FROM_SNAPSHOT: + if (options.scanSnapshotId() != null) { + return isStreaming + ? new ContinuousFromSnapshotStartingScanner(options.scanSnapshotId()) + : new StaticFromSnapshotStartingScanner(options.scanSnapshotId()); + } else { + checkArgument(!isStreaming, "Cannot scan from tag in streaming mode."); + return new StaticFromTagStartingScanner(options().scanTagName()); + } case FROM_SNAPSHOT_FULL: - Long snapshotId = options.scanSnapshotId(); - Preconditions.checkNotNull( - snapshotId, - String.format( - "%s can not be null when you use %s for %s", - CoreOptions.SCAN_SNAPSHOT_ID.key(), - startupMode, - CoreOptions.SCAN_MODE.key())); - return isStreaming && startupMode == CoreOptions.StartupMode.FROM_SNAPSHOT - ? new ContinuousFromSnapshotStartingScanner(snapshotId) - : new StaticFromSnapshotStartingScanner(snapshotId); + return new StaticFromSnapshotStartingScanner(options.scanSnapshotId()); default: throw new UnsupportedOperationException( "Unknown startup mode " + startupMode.name()); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java index f0e4bc3cb6cd..a602eedbb9af 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java @@ -39,6 +39,8 @@ public interface SnapshotReader { SnapshotReader withSnapshot(long snapshotId); + SnapshotReader withSnapshot(Snapshot snapshot); + SnapshotReader withFilter(Predicate predicate); SnapshotReader withKind(ScanKind scanKind); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 42f5cc7273d5..c829364340dd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -93,6 +93,12 @@ public SnapshotReader withSnapshot(long snapshotId) { return this; } + @Override + public SnapshotReader withSnapshot(Snapshot snapshot) { + scan.withSnapshot(snapshot); + return this; + } + @Override public SnapshotReader withFilter(Predicate predicate) { List partitionKeys = tableSchema.partitionKeys(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java new file mode 100644 index 000000000000..32b8dc4e3d71 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source.snapshot; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.operation.ScanKind; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +/** + * {@link StartingScanner} for the {@link CoreOptions.StartupMode#FROM_TAG} startup mode of a batch + * read. + */ +public class StaticFromTagStartingScanner implements StartingScanner { + + private final String tagName; + + public StaticFromTagStartingScanner(String tagName) { + this.tagName = tagName; + } + + @Override + public Result scan(SnapshotManager snapshotManager, SnapshotReader snapshotReader) { + TagManager tagManager = + new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); + Snapshot snapshot = tagManager.taggedSnapshot(tagName); + + return StartingScanner.fromPlan( + snapshotReader.withKind(ScanKind.ALL).withSnapshot(snapshot).read()); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index 6b167ff0d669..8dd94df14ade 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -19,6 +19,7 @@ package org.apache.paimon.table.system; import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; import org.apache.paimon.consumer.ConsumerManager; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryString; @@ -195,6 +196,11 @@ public SnapshotReader withSnapshot(long snapshotId) { return this; } + public SnapshotReader withSnapshot(Snapshot snapshot) { + snapshotReader.withSnapshot(snapshot); + return this; + } + public SnapshotReader withFilter(Predicate predicate) { convert(predicate).ifPresent(snapshotReader::withFilter); return this; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java new file mode 100644 index 000000000000..eb524625b201 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source.snapshot; + +import org.apache.paimon.table.sink.StreamTableCommit; +import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.testutils.assertj.AssertionUtils; +import org.apache.paimon.utils.SnapshotManager; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link StaticFromTagStartingScanner}. */ +public class StaticFromTagStartingScannerTest extends ScannerTestBase { + + @Test + public void testScan() throws Exception { + SnapshotManager snapshotManager = table.snapshotManager(); + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + write.write(rowData(1, 10, 100L)); + write.write(rowData(1, 20, 200L)); + commit.commit(0, write.prepareCommit(true, 0)); + + write.write(rowData(2, 30, 101L)); + write.write(rowData(2, 40, 201L)); + commit.commit(1, write.prepareCommit(true, 1)); + + write.write(rowData(3, 50, 500L)); + write.write(rowData(3, 60, 600L)); + commit.commit(2, write.prepareCommit(true, 2)); + + table.createTag("tag2", 2); + + StaticFromTagStartingScanner scanner = new StaticFromTagStartingScanner("tag2"); + StartingScanner.ScannedResult result = + (StartingScanner.ScannedResult) scanner.scan(snapshotManager, snapshotReader); + assertThat(result.currentSnapshotId()).isEqualTo(2); + assertThat(getResult(table.newRead(), toSplits(result.splits()))) + .hasSameElementsAs( + Arrays.asList("+I 1|10|100", "+I 1|20|200", "+I 2|30|101", "+I 2|40|201")); + + write.close(); + commit.close(); + } + + @Test + public void testNonExistingTag() { + SnapshotManager snapshotManager = table.snapshotManager(); + StaticFromTagStartingScanner scanner = new StaticFromTagStartingScanner("non-existing"); + assertThatThrownBy(() -> scanner.scan(snapshotManager, snapshotReader)) + .satisfies( + AssertionUtils.anyCauseMatches( + IllegalArgumentException.class, + "Tag 'non-existing' doesn't exist")); + } +} diff --git a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 59cc98ce4d16..8f601761479d 100644 --- a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -52,7 +52,7 @@ public void testOverwriteEmpty() { } @Test - public void testTimeTravelRead() throws InterruptedException { + public void testTimeTravelRead() throws Exception { batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)"); long time1 = System.currentTimeMillis(); @@ -67,6 +67,8 @@ public void testTimeTravelRead() throws InterruptedException { Thread.sleep(10); batchSql("INSERT INTO T VALUES (7, 77, 777), (8, 88, 888)"); + paimonTable("T").createTag("tag2", 2); + assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='1') */")) .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222)); @@ -125,9 +127,7 @@ public void testTimeTravelRead() throws InterruptedException { time3))) .hasRootCauseInstanceOf(IllegalArgumentException.class) .hasRootCauseMessage( - "%s must be null when you set %s", - CoreOptions.SCAN_SNAPSHOT_ID.key(), - CoreOptions.SCAN_TIMESTAMP_MILLIS.key()); + "[scan.snapshot-id,scan.tag-name] must be null when you set [scan.timestamp-millis]"); assertThatThrownBy( () -> @@ -137,5 +137,18 @@ public void testTimeTravelRead() throws InterruptedException { .hasRootCauseMessage( "%s must be null when you use latest-full for scan.mode", CoreOptions.SCAN_SNAPSHOT_ID.key()); + + // travel to tag + assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='tag2') */")) + .containsExactlyInAnyOrder( + Row.of(1, 11, 111), + Row.of(2, 22, 222), + Row.of(3, 33, 333), + Row.of(4, 44, 444)); + + assertThatThrownBy( + () -> batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='unknown') */")) + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage("Tag 'unknown' doesn't exist."); } } diff --git a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java index 0f656904074d..d14e0649508f 100644 --- a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java +++ b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java @@ -19,8 +19,10 @@ package org.apache.paimon.flink; import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.table.Table; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; @@ -29,11 +31,6 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentImpl; -import org.apache.flink.table.catalog.Catalog; -import org.apache.flink.table.catalog.CatalogBaseTable; -import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateCatalogOperation; @@ -127,11 +124,11 @@ protected CloseableIterator streamSqlIter(String query, Object... args) { return sEnv.executeSql(String.format(query, args)).collect(); } - protected CatalogTable table(String tableName) throws TableNotExistException { - Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get(); - CatalogBaseTable table = - catalog.getTable(new ObjectPath(catalog.getDefaultDatabase(), tableName)); - return (CatalogTable) table; + protected Table paimonTable(String tableName) + throws org.apache.paimon.catalog.Catalog.TableNotExistException { + FlinkCatalog flinkCatalog = (FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get(); + org.apache.paimon.catalog.Catalog catalog = flinkCatalog.catalog(); + return catalog.getTable(Identifier.create(tEnv.getCurrentDatabase(), tableName)); } protected Path getTableDirectory(String tableName) { diff --git a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 59cc98ce4d16..8f601761479d 100644 --- a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -52,7 +52,7 @@ public void testOverwriteEmpty() { } @Test - public void testTimeTravelRead() throws InterruptedException { + public void testTimeTravelRead() throws Exception { batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)"); long time1 = System.currentTimeMillis(); @@ -67,6 +67,8 @@ public void testTimeTravelRead() throws InterruptedException { Thread.sleep(10); batchSql("INSERT INTO T VALUES (7, 77, 777), (8, 88, 888)"); + paimonTable("T").createTag("tag2", 2); + assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='1') */")) .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222)); @@ -125,9 +127,7 @@ public void testTimeTravelRead() throws InterruptedException { time3))) .hasRootCauseInstanceOf(IllegalArgumentException.class) .hasRootCauseMessage( - "%s must be null when you set %s", - CoreOptions.SCAN_SNAPSHOT_ID.key(), - CoreOptions.SCAN_TIMESTAMP_MILLIS.key()); + "[scan.snapshot-id,scan.tag-name] must be null when you set [scan.timestamp-millis]"); assertThatThrownBy( () -> @@ -137,5 +137,18 @@ public void testTimeTravelRead() throws InterruptedException { .hasRootCauseMessage( "%s must be null when you use latest-full for scan.mode", CoreOptions.SCAN_SNAPSHOT_ID.key()); + + // travel to tag + assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='tag2') */")) + .containsExactlyInAnyOrder( + Row.of(1, 11, 111), + Row.of(2, 22, 222), + Row.of(3, 33, 333), + Row.of(4, 44, 444)); + + assertThatThrownBy( + () -> batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='unknown') */")) + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage("Tag 'unknown' doesn't exist."); } } diff --git a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java index 0f656904074d..d14e0649508f 100644 --- a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java +++ b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java @@ -19,8 +19,10 @@ package org.apache.paimon.flink; import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.table.Table; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; @@ -29,11 +31,6 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentImpl; -import org.apache.flink.table.catalog.Catalog; -import org.apache.flink.table.catalog.CatalogBaseTable; -import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateCatalogOperation; @@ -127,11 +124,11 @@ protected CloseableIterator streamSqlIter(String query, Object... args) { return sEnv.executeSql(String.format(query, args)).collect(); } - protected CatalogTable table(String tableName) throws TableNotExistException { - Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get(); - CatalogBaseTable table = - catalog.getTable(new ObjectPath(catalog.getDefaultDatabase(), tableName)); - return (CatalogTable) table; + protected Table paimonTable(String tableName) + throws org.apache.paimon.catalog.Catalog.TableNotExistException { + FlinkCatalog flinkCatalog = (FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get(); + org.apache.paimon.catalog.Catalog catalog = flinkCatalog.catalog(); + return catalog.getTable(Identifier.create(tEnv.getCurrentDatabase(), tableName)); } protected Path getTableDirectory(String tableName) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 4c8e1967d4a5..88eb01619a3c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -47,7 +47,7 @@ public void testOverwriteEmpty() { } @Test - public void testTimeTravelRead() throws InterruptedException { + public void testTimeTravelRead() throws Exception { batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)"); long time1 = System.currentTimeMillis(); @@ -62,6 +62,8 @@ public void testTimeTravelRead() throws InterruptedException { Thread.sleep(10); batchSql("INSERT INTO T VALUES (7, 77, 777), (8, 88, 888)"); + paimonTable("T").createTag("tag2", 2); + assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='1') */")) .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222)); @@ -120,9 +122,7 @@ public void testTimeTravelRead() throws InterruptedException { time3))) .hasRootCauseInstanceOf(IllegalArgumentException.class) .hasRootCauseMessage( - "%s must be null when you set %s", - CoreOptions.SCAN_SNAPSHOT_ID.key(), - CoreOptions.SCAN_TIMESTAMP_MILLIS.key()); + "[scan.snapshot-id,scan.tag-name] must be null when you set [scan.timestamp-millis]"); assertThatThrownBy( () -> @@ -132,5 +132,18 @@ public void testTimeTravelRead() throws InterruptedException { .hasRootCauseMessage( "%s must be null when you use latest-full for scan.mode", CoreOptions.SCAN_SNAPSHOT_ID.key()); + + // travel to tag + assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='tag2') */")) + .containsExactlyInAnyOrder( + Row.of(1, 11, 111), + Row.of(2, 22, 222), + Row.of(3, 33, 333), + Row.of(4, 44, 444)); + + assertThatThrownBy( + () -> batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='unknown') */")) + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage("Tag 'unknown' doesn't exist."); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java index ba51700c7c91..c15a6841aea1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java @@ -19,9 +19,11 @@ package org.apache.paimon.flink; import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.table.Table; import org.apache.paimon.utils.BlockingIterator; import org.apache.paimon.utils.SnapshotManager; @@ -141,12 +143,22 @@ protected BlockingIterator streamSqlBlockIter(String query, Object... } protected CatalogTable table(String tableName) throws TableNotExistException { - Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get(); + Catalog catalog = flinkCatalog(); CatalogBaseTable table = catalog.getTable(new ObjectPath(catalog.getDefaultDatabase(), tableName)); return (CatalogTable) table; } + protected Table paimonTable(String tableName) + throws org.apache.paimon.catalog.Catalog.TableNotExistException { + org.apache.paimon.catalog.Catalog catalog = flinkCatalog().catalog(); + return catalog.getTable(Identifier.create(tEnv.getCurrentDatabase(), tableName)); + } + + private FlinkCatalog flinkCatalog() { + return (FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get(); + } + protected Path getTableDirectory(String tableName) { return new Path( new File(path, String.format("%s.db/%s", tEnv.getCurrentDatabase(), tableName)) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 8e57bee4db35..3e93d5d8e110 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -215,21 +215,17 @@ public SparkTable loadTable(Identifier ident) throws NoSuchTableException { */ public SparkTable loadTable(Identifier ident, String version) throws NoSuchTableException { Table table = loadAndRequireDataTable(ident); - long snapshotId; + Options dynamicOptions = new Options(); - try { - snapshotId = Long.parseUnsignedLong(version); - } catch (NumberFormatException e) { - throw new IllegalArgumentException( - String.format( - "Version for time travel should be a LONG value representing snapshot id but was '%s'.", - version), - e); + if (version.chars().allMatch(Character::isDigit)) { + long snapshotId = Long.parseUnsignedLong(version); + LOG.info("Time travel to snapshot '{}'.", snapshotId); + dynamicOptions.set(CoreOptions.SCAN_SNAPSHOT_ID, snapshotId); + } else { + LOG.info("Time travel to tag '{}'.", version); + dynamicOptions.set(CoreOptions.SCAN_TAG_NAME, version); } - LOG.info("Time travel target snapshot id is {}.", snapshotId); - - Options dynamicOptions = new Options().set(CoreOptions.SCAN_SNAPSHOT_ID, snapshotId); return new SparkTable(table.copy(dynamicOptions.toMap())); } diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java index 50985c9c9bf3..128401b711d1 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java @@ -168,11 +168,14 @@ protected static void createTable(String tableName) { tableName)); } + protected static FileStoreTable getTable(String tableName) { + return FileStoreTableFactory.create( + LocalFileIO.create(), + new Path(warehousePath, String.format("default.db/%s", tableName))); + } + protected static void writeTable(String tableName, GenericRow... rows) throws Exception { - FileStoreTable fileStoreTable = - FileStoreTableFactory.create( - LocalFileIO.create(), - new Path(warehousePath, String.format("default.db/%s", tableName))); + FileStoreTable fileStoreTable = getTable(tableName); StreamWriteBuilder streamWriteBuilder = fileStoreTable.newStreamWriteBuilder(); StreamTableWrite writer = streamWriteBuilder.newWrite(); StreamTableCommit commit = streamWriteBuilder.newCommit(); diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java index e81d9ba913f8..6c052c4bf17b 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java @@ -163,24 +163,52 @@ public void testTravelToNonExistedTimestamp() { } @Test - public void testIllegalVersionString() { + public void testUnsupportedSystemTableTimeTravel() { spark.sql("CREATE TABLE t (k INT, v STRING)"); - assertThatThrownBy(() -> spark.sql("SELECT * FROM t VERSION AS OF '1.5'")) + assertThatThrownBy(() -> spark.sql("SELECT * FROM `t$snapshots` VERSION AS OF 1")) .satisfies( AssertionUtils.anyCauseMatches( - IllegalArgumentException.class, - "Version for time travel should be a LONG value representing snapshot id but was '1.5'.")); + UnsupportedOperationException.class, + "Only DataTable supports time travel but given table type is 'org.apache.paimon.table.system.SnapshotsTable'")); } @Test - public void testUnsupportedSystemTableTimeTravel() { + public void testTravelToTag() throws Exception { spark.sql("CREATE TABLE t (k INT, v STRING)"); - assertThatThrownBy(() -> spark.sql("SELECT * FROM `t$snapshots` VERSION AS OF 1")) + // snapshot 1 + writeTable( + "t", + GenericRow.of(1, BinaryString.fromString("Hello")), + GenericRow.of(2, BinaryString.fromString("Paimon"))); + + // snapshot 2 + writeTable( + "t", + GenericRow.of(3, BinaryString.fromString("Test")), + GenericRow.of(4, BinaryString.fromString("Case"))); + + // snapshot 3 + writeTable( + "t", + GenericRow.of(5, BinaryString.fromString("Time")), + GenericRow.of(6, BinaryString.fromString("Travel"))); + + getTable("t").createTag("tag2", 2); + + // time travel to tag2 + assertThat(spark.sql("SELECT * FROM t VERSION AS OF 'tag2'").collectAsList().toString()) + .isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]"); + } + + @Test + public void testTravelToNonExistingTag() { + spark.sql("CREATE TABLE t (k INT, v STRING)"); + assertThatThrownBy( + () -> spark.sql("SELECT * FROM t VERSION AS OF 'unknown'").collectAsList()) .satisfies( AssertionUtils.anyCauseMatches( - UnsupportedOperationException.class, - "Only DataTable supports time travel but given table type is 'org.apache.paimon.table.system.SnapshotsTable'")); + IllegalArgumentException.class, "Tag 'unknown' doesn't exist.")); } }