From 736f1698bffd885cadec7bf17a1fbce6008e0be2 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Tue, 20 Jun 2023 16:20:19 +0800 Subject: [PATCH 1/6] [core] Support time travel to tag --- .../generated/core_configuration.html | 8 +- .../java/org/apache/paimon/CoreOptions.java | 23 +++++- .../operation/AbstractFileStoreScan.java | 41 +++++++--- .../paimon/operation/FileStoreScan.java | 3 + .../paimon/table/AbstractFileStoreTable.java | 11 ++- .../table/source/AbstractInnerTableScan.java | 5 ++ .../table/source/snapshot/SnapshotReader.java | 2 + .../source/snapshot/SnapshotReaderImpl.java | 6 ++ .../StaticFromTagStartingScanner.java | 48 ++++++++++++ .../paimon/table/system/AuditLogTable.java | 6 ++ .../StaticFromTagStartingScannerTest.java | 78 +++++++++++++++++++ .../paimon/flink/BatchFileStoreITCase.java | 17 +++- .../paimon/flink/CatalogITCaseBase.java | 14 +++- .../org/apache/paimon/spark/SparkCatalog.java | 20 ++--- .../paimon/spark/SparkReadTestBase.java | 11 ++- .../paimon/spark/SparkTimeTravelITCase.java | 44 +++++++++-- 16 files changed, 293 insertions(+), 44 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 074f7c0e29e4..281df20ffc8f 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -348,7 +348,7 @@
scan.mode
default

Enum

- Specify the scanning behavior of the source.

Possible values: + Specify the scanning behavior of the source.

Possible values:
scan.plan-sort-partition
@@ -362,6 +362,12 @@ Long Optional snapshot id used in case of "from-snapshot" or "from-snapshot-full" scan mode + +
scan.tag-name
+ (none) + String + Optional tag name used in case of "from-tag" scan mode. +
scan.timestamp-millis
(none) diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java index 1995aa5dec81..bca215459841 100644 --- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java @@ -401,6 +401,12 @@ public class CoreOptions implements Serializable { .withDescription( "Optional snapshot id used in case of \"from-snapshot\" or \"from-snapshot-full\" scan mode"); + public static final ConfigOption SCAN_TAG_NAME = + key("scan.tag-name") + .stringType() + .noDefaultValue() + .withDescription("Optional tag name used in case of \"from-tag\" scan mode."); + public static final ConfigOption SCAN_BOUNDED_WATERMARK = key("scan.bounded.watermark") .longType() @@ -889,6 +895,8 @@ public static StartupMode startupMode(Options options) { return StartupMode.FROM_TIMESTAMP; } else if (options.getOptional(SCAN_SNAPSHOT_ID).isPresent()) { return StartupMode.FROM_SNAPSHOT; + } else if (options.getOptional(SCAN_TAG_NAME).isPresent()) { + return StartupMode.FROM_TAG; } else { return StartupMode.LATEST_FULL; } @@ -911,6 +919,10 @@ public Long scanSnapshotId() { return options.get(SCAN_SNAPSHOT_ID); } + public String scanTagName() { + return options.get(SCAN_TAG_NAME); + } + public Integer scanManifestParallelism() { return options.get(SCAN_MANIFEST_PARALLELISM); } @@ -1007,8 +1019,9 @@ public enum StartupMode implements DescribedEnum { DEFAULT( "default", "Determines actual startup mode according to other table properties. " - + "If \"scan.timestamp-millis\" is set the actual startup mode will be \"from-timestamp\", " - + "and if \"scan.snapshot-id\" is set the actual startup mode will be \"from-snapshot\". " + + "If \"scan.timestamp-millis\" is set the actual startup mode will be \"from-timestamp\"; " + + "if \"scan.snapshot-id\" is set the actual startup mode will be \"from-snapshot\"; " + + "and if \"scan.tag-name\" ise set the actual startup mode will be \"from-tag\". " + "Otherwise the actual startup mode will be \"latest-full\"."), LATEST_FULL( @@ -1052,7 +1065,11 @@ public enum StartupMode implements DescribedEnum { "from-snapshot-full", "For streaming sources, produces from snapshot specified by \"scan.snapshot-id\" " + "on the table upon first startup, and continuously reads changes. For batch sources, " - + "produces a snapshot specified by \"scan.snapshot-id\" but does not read new changes."); + + "produces a snapshot specified by \"scan.snapshot-id\" but does not read new changes."), + + FROM_TAG( + "from-tag", + "This mode is only for batch sources. It produces a snapshot based on the given tag."); private final String value; private final String description; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 4aa325487056..ed193edd3da6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -51,6 +51,7 @@ import java.util.stream.Collectors; import static org.apache.paimon.utils.Preconditions.checkArgument; +import static org.apache.paimon.utils.Preconditions.checkState; /** Default implementation of {@link FileStoreScan}. */ public abstract class AbstractFileStoreScan implements FileStoreScan { @@ -69,6 +70,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private Predicate partitionFilter; private Long specifiedSnapshotId = null; + private Snapshot specifiedSnapshot = null; private Integer specifiedBucket = null; private List specifiedManifests = null; private ScanKind scanKind = ScanKind.ALL; @@ -143,18 +145,24 @@ public FileStoreScan withPartitionBucket(BinaryRow partition, int bucket) { @Override public FileStoreScan withSnapshot(long snapshotId) { this.specifiedSnapshotId = snapshotId; - if (specifiedManifests != null) { - throw new IllegalStateException("Cannot set both snapshot id and manifests."); - } + checkState(specifiedSnapshot == null, "Cannot set both snapshot id and snapshot."); + checkState(specifiedManifests == null, "Cannot set both snapshot id and manifests."); + return this; + } + + @Override + public FileStoreScan withSnapshot(Snapshot snapshot) { + this.specifiedSnapshot = snapshot; + checkState(specifiedSnapshotId == null, "Cannot set both snapshot and snapshot id."); + checkState(specifiedManifests == null, "Cannot set both snapshot and manifests."); return this; } @Override public FileStoreScan withManifestList(List manifests) { this.specifiedManifests = manifests; - if (specifiedSnapshotId != null) { - throw new IllegalStateException("Cannot set both snapshot id and manifests."); - } + checkState(specifiedSnapshot == null, "Cannot set both manifests and snapshot."); + checkState(specifiedSnapshotId == null, "Cannot set both manifests and snapshot id."); return this; } @@ -209,14 +217,10 @@ private Pair> doPlan( List manifests = specifiedManifests; Snapshot snapshot = null; if (manifests == null) { - Long snapshotId = specifiedSnapshotId; - if (snapshotId == null) { - snapshotId = snapshotManager.latestSnapshotId(); - } - if (snapshotId == null) { + snapshot = determineSnapshot(); + if (snapshot == null) { manifests = Collections.emptyList(); } else { - snapshot = snapshotManager.snapshot(snapshotId); manifests = readManifests(snapshot); } } @@ -266,6 +270,19 @@ private Pair> doPlan( return Pair.of(snapshot, files); } + @Nullable + private Snapshot determineSnapshot() { + if (specifiedSnapshot != null) { + return specifiedSnapshot; + } + + Long snapshotId = specifiedSnapshotId; + if (snapshotId == null) { + snapshotId = snapshotManager.latestSnapshotId(); + } + return snapshotId == null ? null : snapshotManager.snapshot(snapshotId); + } + private List readManifests(Snapshot snapshot) { switch (scanKind) { case ALL: diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java index 0f1697a26ce8..457d9a0bf580 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java @@ -18,6 +18,7 @@ package org.apache.paimon.operation; +import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.FileKind; @@ -48,6 +49,8 @@ public interface FileStoreScan { FileStoreScan withSnapshot(long snapshotId); + FileStoreScan withSnapshot(Snapshot snapshot); + FileStoreScan withManifestList(List manifests); FileStoreScan withKind(ScanKind scanKind); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 50f05bacf08b..d6f3a6fcbfbc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -239,12 +239,11 @@ public TableCommitImpl newCommit(String commitUser) { private Optional tryTimeTravel(Options options) { CoreOptions coreOptions = new CoreOptions(options); - Long snapshotId; switch (coreOptions.startupMode()) { case FROM_SNAPSHOT: case FROM_SNAPSHOT_FULL: - snapshotId = coreOptions.scanSnapshotId(); + long snapshotId = coreOptions.scanSnapshotId(); if (snapshotManager().snapshotExists(snapshotId)) { long schemaId = snapshotManager().snapshot(snapshotId).schemaId(); return Optional.of(schemaManager().schema(schemaId).copy(options.toMap())); @@ -259,6 +258,14 @@ private Optional tryTimeTravel(Options options) { return Optional.of(schemaManager().schema(schemaId).copy(options.toMap())); } return Optional.empty(); + case FROM_TAG: + String tagName = coreOptions.scanTagName(); + TagManager tagManager = new TagManager(fileIO, path); + if (tagManager.tagExists(tagName)) { + long schemaId = tagManager.taggedSnapshot(tagName).schemaId(); + return Optional.of(schemaManager().schema(schemaId).copy(options.toMap())); + } + return Optional.empty(); default: return Optional.empty(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java index f5943987c93a..1fdc693549f3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java @@ -35,6 +35,7 @@ import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.paimon.table.source.snapshot.StartingScanner; import org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner; +import org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner; import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner; import org.apache.paimon.utils.Preconditions; @@ -135,6 +136,10 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { return isStreaming && startupMode == CoreOptions.StartupMode.FROM_SNAPSHOT ? new ContinuousFromSnapshotStartingScanner(snapshotId) : new StaticFromSnapshotStartingScanner(snapshotId); + case FROM_TAG: + Preconditions.checkArgument( + !isStreaming, "Cannot scan from tag in streaming mode."); + return new StaticFromTagStartingScanner(options().scanTagName()); default: throw new UnsupportedOperationException( "Unknown startup mode " + startupMode.name()); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java index f0e4bc3cb6cd..a602eedbb9af 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java @@ -39,6 +39,8 @@ public interface SnapshotReader { SnapshotReader withSnapshot(long snapshotId); + SnapshotReader withSnapshot(Snapshot snapshot); + SnapshotReader withFilter(Predicate predicate); SnapshotReader withKind(ScanKind scanKind); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index 42f5cc7273d5..c829364340dd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -93,6 +93,12 @@ public SnapshotReader withSnapshot(long snapshotId) { return this; } + @Override + public SnapshotReader withSnapshot(Snapshot snapshot) { + scan.withSnapshot(snapshot); + return this; + } + @Override public SnapshotReader withFilter(Predicate predicate) { List partitionKeys = tableSchema.partitionKeys(); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java new file mode 100644 index 000000000000..32b8dc4e3d71 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source.snapshot; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; +import org.apache.paimon.operation.ScanKind; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +/** + * {@link StartingScanner} for the {@link CoreOptions.StartupMode#FROM_TAG} startup mode of a batch + * read. + */ +public class StaticFromTagStartingScanner implements StartingScanner { + + private final String tagName; + + public StaticFromTagStartingScanner(String tagName) { + this.tagName = tagName; + } + + @Override + public Result scan(SnapshotManager snapshotManager, SnapshotReader snapshotReader) { + TagManager tagManager = + new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); + Snapshot snapshot = tagManager.taggedSnapshot(tagName); + + return StartingScanner.fromPlan( + snapshotReader.withKind(ScanKind.ALL).withSnapshot(snapshot).read()); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index 6b167ff0d669..8dd94df14ade 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -19,6 +19,7 @@ package org.apache.paimon.table.system; import org.apache.paimon.CoreOptions; +import org.apache.paimon.Snapshot; import org.apache.paimon.consumer.ConsumerManager; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryString; @@ -195,6 +196,11 @@ public SnapshotReader withSnapshot(long snapshotId) { return this; } + public SnapshotReader withSnapshot(Snapshot snapshot) { + snapshotReader.withSnapshot(snapshot); + return this; + } + public SnapshotReader withFilter(Predicate predicate) { convert(predicate).ifPresent(snapshotReader::withFilter); return this; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java new file mode 100644 index 000000000000..eb524625b201 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScannerTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.source.snapshot; + +import org.apache.paimon.table.sink.StreamTableCommit; +import org.apache.paimon.table.sink.StreamTableWrite; +import org.apache.paimon.testutils.assertj.AssertionUtils; +import org.apache.paimon.utils.SnapshotManager; + +import org.junit.jupiter.api.Test; + +import java.util.Arrays; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Tests for {@link StaticFromTagStartingScanner}. */ +public class StaticFromTagStartingScannerTest extends ScannerTestBase { + + @Test + public void testScan() throws Exception { + SnapshotManager snapshotManager = table.snapshotManager(); + StreamTableWrite write = table.newWrite(commitUser); + StreamTableCommit commit = table.newCommit(commitUser); + + write.write(rowData(1, 10, 100L)); + write.write(rowData(1, 20, 200L)); + commit.commit(0, write.prepareCommit(true, 0)); + + write.write(rowData(2, 30, 101L)); + write.write(rowData(2, 40, 201L)); + commit.commit(1, write.prepareCommit(true, 1)); + + write.write(rowData(3, 50, 500L)); + write.write(rowData(3, 60, 600L)); + commit.commit(2, write.prepareCommit(true, 2)); + + table.createTag("tag2", 2); + + StaticFromTagStartingScanner scanner = new StaticFromTagStartingScanner("tag2"); + StartingScanner.ScannedResult result = + (StartingScanner.ScannedResult) scanner.scan(snapshotManager, snapshotReader); + assertThat(result.currentSnapshotId()).isEqualTo(2); + assertThat(getResult(table.newRead(), toSplits(result.splits()))) + .hasSameElementsAs( + Arrays.asList("+I 1|10|100", "+I 1|20|200", "+I 2|30|101", "+I 2|40|201")); + + write.close(); + commit.close(); + } + + @Test + public void testNonExistingTag() { + SnapshotManager snapshotManager = table.snapshotManager(); + StaticFromTagStartingScanner scanner = new StaticFromTagStartingScanner("non-existing"); + assertThatThrownBy(() -> scanner.scan(snapshotManager, snapshotReader)) + .satisfies( + AssertionUtils.anyCauseMatches( + IllegalArgumentException.class, + "Tag 'non-existing' doesn't exist")); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 4c8e1967d4a5..b7b9b0eccc65 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -47,7 +47,7 @@ public void testOverwriteEmpty() { } @Test - public void testTimeTravelRead() throws InterruptedException { + public void testTimeTravelRead() throws Exception { batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)"); long time1 = System.currentTimeMillis(); @@ -62,6 +62,8 @@ public void testTimeTravelRead() throws InterruptedException { Thread.sleep(10); batchSql("INSERT INTO T VALUES (7, 77, 777), (8, 88, 888)"); + paimonTable("T").createTag("tag2", 2); + assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='1') */")) .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222)); @@ -132,5 +134,18 @@ public void testTimeTravelRead() throws InterruptedException { .hasRootCauseMessage( "%s must be null when you use latest-full for scan.mode", CoreOptions.SCAN_SNAPSHOT_ID.key()); + + // travel to tag + assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='tag2') */")) + .containsExactlyInAnyOrder( + Row.of(1, 11, 111), + Row.of(2, 22, 222), + Row.of(3, 33, 333), + Row.of(4, 44, 444)); + + assertThatThrownBy( + () -> batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='unknown') */")) + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage("Tag 'unknown' doesn't exist."); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java index ba51700c7c91..c15a6841aea1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java @@ -19,9 +19,11 @@ package org.apache.paimon.flink; import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.util.AbstractTestBase; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.table.Table; import org.apache.paimon.utils.BlockingIterator; import org.apache.paimon.utils.SnapshotManager; @@ -141,12 +143,22 @@ protected BlockingIterator streamSqlBlockIter(String query, Object... } protected CatalogTable table(String tableName) throws TableNotExistException { - Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get(); + Catalog catalog = flinkCatalog(); CatalogBaseTable table = catalog.getTable(new ObjectPath(catalog.getDefaultDatabase(), tableName)); return (CatalogTable) table; } + protected Table paimonTable(String tableName) + throws org.apache.paimon.catalog.Catalog.TableNotExistException { + org.apache.paimon.catalog.Catalog catalog = flinkCatalog().catalog(); + return catalog.getTable(Identifier.create(tEnv.getCurrentDatabase(), tableName)); + } + + private FlinkCatalog flinkCatalog() { + return (FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get(); + } + protected Path getTableDirectory(String tableName) { return new Path( new File(path, String.format("%s.db/%s", tEnv.getCurrentDatabase(), tableName)) diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java index 8e57bee4db35..3e93d5d8e110 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java @@ -215,21 +215,17 @@ public SparkTable loadTable(Identifier ident) throws NoSuchTableException { */ public SparkTable loadTable(Identifier ident, String version) throws NoSuchTableException { Table table = loadAndRequireDataTable(ident); - long snapshotId; + Options dynamicOptions = new Options(); - try { - snapshotId = Long.parseUnsignedLong(version); - } catch (NumberFormatException e) { - throw new IllegalArgumentException( - String.format( - "Version for time travel should be a LONG value representing snapshot id but was '%s'.", - version), - e); + if (version.chars().allMatch(Character::isDigit)) { + long snapshotId = Long.parseUnsignedLong(version); + LOG.info("Time travel to snapshot '{}'.", snapshotId); + dynamicOptions.set(CoreOptions.SCAN_SNAPSHOT_ID, snapshotId); + } else { + LOG.info("Time travel to tag '{}'.", version); + dynamicOptions.set(CoreOptions.SCAN_TAG_NAME, version); } - LOG.info("Time travel target snapshot id is {}.", snapshotId); - - Options dynamicOptions = new Options().set(CoreOptions.SCAN_SNAPSHOT_ID, snapshotId); return new SparkTable(table.copy(dynamicOptions.toMap())); } diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java index 50985c9c9bf3..128401b711d1 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkReadTestBase.java @@ -168,11 +168,14 @@ protected static void createTable(String tableName) { tableName)); } + protected static FileStoreTable getTable(String tableName) { + return FileStoreTableFactory.create( + LocalFileIO.create(), + new Path(warehousePath, String.format("default.db/%s", tableName))); + } + protected static void writeTable(String tableName, GenericRow... rows) throws Exception { - FileStoreTable fileStoreTable = - FileStoreTableFactory.create( - LocalFileIO.create(), - new Path(warehousePath, String.format("default.db/%s", tableName))); + FileStoreTable fileStoreTable = getTable(tableName); StreamWriteBuilder streamWriteBuilder = fileStoreTable.newStreamWriteBuilder(); StreamTableWrite writer = streamWriteBuilder.newWrite(); StreamTableCommit commit = streamWriteBuilder.newCommit(); diff --git a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java index e81d9ba913f8..6c052c4bf17b 100644 --- a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java +++ b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkTimeTravelITCase.java @@ -163,24 +163,52 @@ public void testTravelToNonExistedTimestamp() { } @Test - public void testIllegalVersionString() { + public void testUnsupportedSystemTableTimeTravel() { spark.sql("CREATE TABLE t (k INT, v STRING)"); - assertThatThrownBy(() -> spark.sql("SELECT * FROM t VERSION AS OF '1.5'")) + assertThatThrownBy(() -> spark.sql("SELECT * FROM `t$snapshots` VERSION AS OF 1")) .satisfies( AssertionUtils.anyCauseMatches( - IllegalArgumentException.class, - "Version for time travel should be a LONG value representing snapshot id but was '1.5'.")); + UnsupportedOperationException.class, + "Only DataTable supports time travel but given table type is 'org.apache.paimon.table.system.SnapshotsTable'")); } @Test - public void testUnsupportedSystemTableTimeTravel() { + public void testTravelToTag() throws Exception { spark.sql("CREATE TABLE t (k INT, v STRING)"); - assertThatThrownBy(() -> spark.sql("SELECT * FROM `t$snapshots` VERSION AS OF 1")) + // snapshot 1 + writeTable( + "t", + GenericRow.of(1, BinaryString.fromString("Hello")), + GenericRow.of(2, BinaryString.fromString("Paimon"))); + + // snapshot 2 + writeTable( + "t", + GenericRow.of(3, BinaryString.fromString("Test")), + GenericRow.of(4, BinaryString.fromString("Case"))); + + // snapshot 3 + writeTable( + "t", + GenericRow.of(5, BinaryString.fromString("Time")), + GenericRow.of(6, BinaryString.fromString("Travel"))); + + getTable("t").createTag("tag2", 2); + + // time travel to tag2 + assertThat(spark.sql("SELECT * FROM t VERSION AS OF 'tag2'").collectAsList().toString()) + .isEqualTo("[[1,Hello], [2,Paimon], [3,Test], [4,Case]]"); + } + + @Test + public void testTravelToNonExistingTag() { + spark.sql("CREATE TABLE t (k INT, v STRING)"); + assertThatThrownBy( + () -> spark.sql("SELECT * FROM t VERSION AS OF 'unknown'").collectAsList()) .satisfies( AssertionUtils.anyCauseMatches( - UnsupportedOperationException.class, - "Only DataTable supports time travel but given table type is 'org.apache.paimon.table.system.SnapshotsTable'")); + IllegalArgumentException.class, "Tag 'unknown' doesn't exist.")); } } From a6ec9ce88a87b6163ae7002d14fa3c804a585fce Mon Sep 17 00:00:00 2001 From: yuzelin Date: Wed, 21 Jun 2023 11:41:45 +0800 Subject: [PATCH 2/6] [fix] address comments --- .../java/org/apache/paimon/CoreOptions.java | 19 ++--- .../operation/AbstractFileStoreScan.java | 36 +++------ .../paimon/schema/SchemaValidation.java | 76 +++++++++++++++---- .../paimon/table/AbstractFileStoreTable.java | 25 +++--- .../table/source/AbstractInnerTableScan.java | 35 +++------ 5 files changed, 102 insertions(+), 89 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java index bca215459841..7d4a93ec7db4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java @@ -893,10 +893,9 @@ public static StartupMode startupMode(Options options) { if (mode == StartupMode.DEFAULT) { if (options.getOptional(SCAN_TIMESTAMP_MILLIS).isPresent()) { return StartupMode.FROM_TIMESTAMP; - } else if (options.getOptional(SCAN_SNAPSHOT_ID).isPresent()) { + } else if (options.getOptional(SCAN_SNAPSHOT_ID).isPresent() + || options.getOptional(SCAN_TAG_NAME).isPresent()) { return StartupMode.FROM_SNAPSHOT; - } else if (options.getOptional(SCAN_TAG_NAME).isPresent()) { - return StartupMode.FROM_TAG; } else { return StartupMode.LATEST_FULL; } @@ -1056,20 +1055,16 @@ public enum StartupMode implements DescribedEnum { FROM_SNAPSHOT( "from-snapshot", - "For streaming sources, continuously reads changes " - + "starting from snapshot specified by \"scan.snapshot-id\", " - + "without producing a snapshot at the beginning. For batch sources, " - + "produces a snapshot specified by \"scan.snapshot-id\" but does not read new changes."), + "For streaming sources, continuously reads changes starting from snapshot " + + "specified by \"scan.snapshot-id\", without producing a snapshot at the beginning. " + + "For batch sources, produces a snapshot specified by \"scan.snapshot-id\" " + + "or \"scan.tag-name\" but does not read new changes."), FROM_SNAPSHOT_FULL( "from-snapshot-full", "For streaming sources, produces from snapshot specified by \"scan.snapshot-id\" " + "on the table upon first startup, and continuously reads changes. For batch sources, " - + "produces a snapshot specified by \"scan.snapshot-id\" but does not read new changes."), - - FROM_TAG( - "from-tag", - "This mode is only for batch sources. It produces a snapshot based on the given tag."); + + "produces a snapshot specified by \"scan.snapshot-id\" but does not read new changes."); private final String value; private final String description; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index ed193edd3da6..fd87f36f4efa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -69,7 +69,6 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { protected final ScanBucketFilter bucketFilter; private Predicate partitionFilter; - private Long specifiedSnapshotId = null; private Snapshot specifiedSnapshot = null; private Integer specifiedBucket = null; private List specifiedManifests = null; @@ -144,25 +143,24 @@ public FileStoreScan withPartitionBucket(BinaryRow partition, int bucket) { @Override public FileStoreScan withSnapshot(long snapshotId) { - this.specifiedSnapshotId = snapshotId; - checkState(specifiedSnapshot == null, "Cannot set both snapshot id and snapshot."); - checkState(specifiedManifests == null, "Cannot set both snapshot id and manifests."); + checkState(specifiedSnapshot == null, "Snapshot has been set."); + checkState(specifiedManifests == null, "Cannot set both snapshot and manifests."); + this.specifiedSnapshot = snapshotManager.snapshot(snapshotId); return this; } @Override public FileStoreScan withSnapshot(Snapshot snapshot) { - this.specifiedSnapshot = snapshot; - checkState(specifiedSnapshotId == null, "Cannot set both snapshot and snapshot id."); + checkState(specifiedSnapshot == null, "Snapshot has been set."); checkState(specifiedManifests == null, "Cannot set both snapshot and manifests."); + this.specifiedSnapshot = snapshot; return this; } @Override public FileStoreScan withManifestList(List manifests) { + checkState(specifiedSnapshot == null, "Cannot set both snapshot and manifests."); this.specifiedManifests = manifests; - checkState(specifiedSnapshot == null, "Cannot set both manifests and snapshot."); - checkState(specifiedSnapshotId == null, "Cannot set both manifests and snapshot id."); return this; } @@ -217,7 +215,10 @@ private Pair> doPlan( List manifests = specifiedManifests; Snapshot snapshot = null; if (manifests == null) { - snapshot = determineSnapshot(); + snapshot = + specifiedSnapshot == null + ? snapshotManager.latestSnapshot() + : specifiedSnapshot; if (snapshot == null) { manifests = Collections.emptyList(); } else { @@ -225,8 +226,6 @@ private Pair> doPlan( } } - final List readManifests = manifests; - Iterable entries = ParallellyExecuteUtils.parallelismBatchIterable( files -> @@ -235,7 +234,7 @@ private Pair> doPlan( .flatMap(m -> readManifest.apply(m).stream()) .filter(this::filterByStats) .collect(Collectors.toList()), - readManifests, + manifests, scanManifestParallelism); List files = new ArrayList<>(); @@ -270,19 +269,6 @@ private Pair> doPlan( return Pair.of(snapshot, files); } - @Nullable - private Snapshot determineSnapshot() { - if (specifiedSnapshot != null) { - return specifiedSnapshot; - } - - Long snapshotId = specifiedSnapshotId; - if (snapshotId == null) { - snapshotId = snapshotManager.latestSnapshotId(); - } - return snapshotId == null ? null : snapshotManager.snapshot(snapshotId); - } - private List readManifests(Snapshot snapshot) { switch (scanKind) { case ALL: diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index ee7b1a08fcb5..a657019b498e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -31,16 +31,19 @@ import org.apache.paimon.types.RowType; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.BUCKET_KEY; import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER; import static org.apache.paimon.CoreOptions.SCAN_MODE; import static org.apache.paimon.CoreOptions.SCAN_SNAPSHOT_ID; +import static org.apache.paimon.CoreOptions.SCAN_TAG_NAME; import static org.apache.paimon.CoreOptions.SCAN_TIMESTAMP_MILLIS; import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX; import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN; @@ -68,18 +71,8 @@ public static void validateTableSchema(TableSchema schema) { validatePrimaryKeysType(schema.fields(), schema.primaryKeys()); CoreOptions options = new CoreOptions(schema.options()); - if (options.startupMode() == CoreOptions.StartupMode.FROM_TIMESTAMP) { - checkOptionExistInMode( - options, SCAN_TIMESTAMP_MILLIS, CoreOptions.StartupMode.FROM_TIMESTAMP); - checkOptionsConflict(options, SCAN_SNAPSHOT_ID, SCAN_TIMESTAMP_MILLIS); - } else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT - || options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) { - checkOptionExistInMode(options, SCAN_SNAPSHOT_ID, options.startupMode()); - checkOptionsConflict(options, SCAN_TIMESTAMP_MILLIS, SCAN_SNAPSHOT_ID); - } else { - checkOptionNotExistInMode(options, SCAN_TIMESTAMP_MILLIS, options.startupMode()); - checkOptionNotExistInMode(options, SCAN_SNAPSHOT_ID, options.startupMode()); - } + + validateStartupMode(options); if (options.writeMode() == WriteMode.APPEND_ONLY && options.changelogProducer() != CoreOptions.ChangelogProducer.NONE) { @@ -191,6 +184,34 @@ private static void validatePrimaryKeysType(List fields, List } } + private static void validateStartupMode(CoreOptions options) { + if (options.startupMode() == CoreOptions.StartupMode.FROM_TIMESTAMP) { + checkOptionExistInMode( + options, SCAN_TIMESTAMP_MILLIS, CoreOptions.StartupMode.FROM_TIMESTAMP); + checkOptionsConflict( + options, + Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TAG_NAME), + Collections.singletonList(SCAN_TIMESTAMP_MILLIS)); + } else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT) { + checkExactOneOptionExistInMode( + options, options.startupMode(), SCAN_SNAPSHOT_ID, SCAN_TAG_NAME); + checkOptionsConflict( + options, + Collections.singletonList(SCAN_TIMESTAMP_MILLIS), + Arrays.asList(SCAN_SNAPSHOT_ID, SCAN_TAG_NAME)); + } else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) { + checkOptionExistInMode(options, SCAN_SNAPSHOT_ID, options.startupMode()); + checkOptionsConflict( + options, + Arrays.asList(SCAN_TIMESTAMP_MILLIS, SCAN_TAG_NAME), + Collections.singletonList(SCAN_SNAPSHOT_ID)); + } else { + checkOptionNotExistInMode(options, SCAN_TIMESTAMP_MILLIS, options.startupMode()); + checkOptionNotExistInMode(options, SCAN_SNAPSHOT_ID, options.startupMode()); + checkOptionNotExistInMode(options, SCAN_TAG_NAME, options.startupMode()); + } + } + private static void checkOptionExistInMode( CoreOptions options, ConfigOption option, CoreOptions.StartupMode startupMode) { checkArgument( @@ -209,11 +230,34 @@ private static void checkOptionNotExistInMode( option.key(), startupMode, SCAN_MODE.key())); } - private static void checkOptionsConflict( - CoreOptions options, ConfigOption illegalOption, ConfigOption legalOption) { + private static void checkExactOneOptionExistInMode( + CoreOptions options, + CoreOptions.StartupMode startupMode, + ConfigOption... configOptions) { checkArgument( - !options.toConfiguration().contains(illegalOption), + Arrays.stream(configOptions) + .filter(op -> options.toConfiguration().contains(op)) + .count() + == 1, String.format( - "%s must be null when you set %s", illegalOption.key(), legalOption.key())); + "must set only one key in [%s] when you use %s for %s", + concatConfigKeys(Arrays.asList(configOptions)), + startupMode, + SCAN_MODE.key())); + } + + private static void checkOptionsConflict( + CoreOptions options, + List> illegalOptions, + List> legalOptions) { + checkArgument( + illegalOptions.stream().noneMatch(op -> options.toConfiguration().contains(op)), + "[%s] must be null when you set [%s]", + concatConfigKeys(illegalOptions), + concatConfigKeys(legalOptions)); + } + + private static String concatConfigKeys(List> configOptions) { + return configOptions.stream().map(ConfigOption::key).collect(Collectors.joining(",")); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index d6f3a6fcbfbc..78eceeb98c8c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -243,10 +243,19 @@ private Optional tryTimeTravel(Options options) { switch (coreOptions.startupMode()) { case FROM_SNAPSHOT: case FROM_SNAPSHOT_FULL: - long snapshotId = coreOptions.scanSnapshotId(); - if (snapshotManager().snapshotExists(snapshotId)) { - long schemaId = snapshotManager().snapshot(snapshotId).schemaId(); - return Optional.of(schemaManager().schema(schemaId).copy(options.toMap())); + if (coreOptions.scanSnapshotId() != null) { + long snapshotId = coreOptions.scanSnapshotId(); + if (snapshotManager().snapshotExists(snapshotId)) { + long schemaId = snapshotManager().snapshot(snapshotId).schemaId(); + return Optional.of(schemaManager().schema(schemaId).copy(options.toMap())); + } + } else { + String tagName = coreOptions.scanTagName(); + TagManager tagManager = new TagManager(fileIO, path); + if (tagManager.tagExists(tagName)) { + long schemaId = tagManager.taggedSnapshot(tagName).schemaId(); + return Optional.of(schemaManager().schema(schemaId).copy(options.toMap())); + } } return Optional.empty(); case FROM_TIMESTAMP: @@ -258,14 +267,6 @@ private Optional tryTimeTravel(Options options) { return Optional.of(schemaManager().schema(schemaId).copy(options.toMap())); } return Optional.empty(); - case FROM_TAG: - String tagName = coreOptions.scanTagName(); - TagManager tagManager = new TagManager(fileIO, path); - if (tagManager.tagExists(tagName)) { - long schemaId = tagManager.taggedSnapshot(tagName).schemaId(); - return Optional.of(schemaManager().schema(schemaId).copy(options.toMap())); - } - return Optional.empty(); default: return Optional.empty(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java index 1fdc693549f3..2ff81d55d987 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java @@ -37,12 +37,12 @@ import org.apache.paimon.table.source.snapshot.StaticFromSnapshotStartingScanner; import org.apache.paimon.table.source.snapshot.StaticFromTagStartingScanner; import org.apache.paimon.table.source.snapshot.StaticFromTimestampStartingScanner; -import org.apache.paimon.utils.Preconditions; import java.util.List; import java.util.Optional; import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS; +import static org.apache.paimon.utils.Preconditions.checkArgument; /** An abstraction layer above {@link FileStoreScan} to provide input split generation. */ public abstract class AbstractInnerTableScan implements InnerTableScan { @@ -71,7 +71,7 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { switch (type) { case NORMAL: { - Preconditions.checkArgument( + checkArgument( isStreaming, "Set 'streaming-compact' in batch mode. This is unexpected."); return new ContinuousCompactorStartingScanner(); @@ -113,33 +113,20 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { } case FROM_TIMESTAMP: Long startupMillis = options.scanTimestampMills(); - Preconditions.checkNotNull( - startupMillis, - String.format( - "%s can not be null when you use %s for %s", - CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), - CoreOptions.StartupMode.FROM_TIMESTAMP, - CoreOptions.SCAN_MODE.key())); return isStreaming ? new ContinuousFromTimestampStartingScanner(startupMillis) : new StaticFromTimestampStartingScanner(startupMillis); case FROM_SNAPSHOT: + if (options.scanSnapshotId() != null) { + return isStreaming + ? new ContinuousFromSnapshotStartingScanner(options.scanSnapshotId()) + : new StaticFromSnapshotStartingScanner(options.scanSnapshotId()); + } else { + checkArgument(!isStreaming, "Cannot scan from tag in streaming mode."); + return new StaticFromTagStartingScanner(options().scanTagName()); + } case FROM_SNAPSHOT_FULL: - Long snapshotId = options.scanSnapshotId(); - Preconditions.checkNotNull( - snapshotId, - String.format( - "%s can not be null when you use %s for %s", - CoreOptions.SCAN_SNAPSHOT_ID.key(), - startupMode, - CoreOptions.SCAN_MODE.key())); - return isStreaming && startupMode == CoreOptions.StartupMode.FROM_SNAPSHOT - ? new ContinuousFromSnapshotStartingScanner(snapshotId) - : new StaticFromSnapshotStartingScanner(snapshotId); - case FROM_TAG: - Preconditions.checkArgument( - !isStreaming, "Cannot scan from tag in streaming mode."); - return new StaticFromTagStartingScanner(options().scanTagName()); + return new StaticFromSnapshotStartingScanner(options.scanSnapshotId()); default: throw new UnsupportedOperationException( "Unknown startup mode " + startupMode.name()); From 361a4d8d2368cf52e7d227db772f8966386730a2 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Wed, 21 Jun 2023 14:49:39 +0800 Subject: [PATCH 3/6] [fix] fix core options --- docs/layouts/shortcodes/generated/core_configuration.html | 2 +- paimon-core/src/main/java/org/apache/paimon/CoreOptions.java | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index 281df20ffc8f..53c514d36fd7 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -348,7 +348,7 @@
scan.mode
default

Enum

- Specify the scanning behavior of the source.

Possible values:
  • "default": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp"; if "scan.snapshot-id" is set the actual startup mode will be "from-snapshot"; and if "scan.tag-name" ise set the actual startup mode will be "from-tag". Otherwise the actual startup mode will be "latest-full".
  • "latest-full": For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes. For batch sources, just produce the latest snapshot but does not read new changes.
  • "full": Deprecated. Same as "latest-full".
  • "latest": For streaming sources, continuously reads latest changes without producing a snapshot at the beginning. For batch sources, behaves the same as the "latest-full" startup mode.
  • "compacted-full": For streaming sources, produces a snapshot after the latest compaction on the table upon first startup, and continue to read the latest changes. For batch sources, just produce a snapshot after the latest compaction but does not read new changes. Snapshots of full compaction are picked when scheduled full-compaction is enabled.
  • "from-timestamp": For streaming sources, continuously reads changes starting from timestamp specified by "scan.timestamp-millis", without producing a snapshot at the beginning. For batch sources, produces a snapshot at timestamp specified by "scan.timestamp-millis" but does not read new changes.
  • "from-snapshot": For streaming sources, continuously reads changes starting from snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning. For batch sources, produces a snapshot specified by "scan.snapshot-id" but does not read new changes.
  • "from-snapshot-full": For streaming sources, produces from snapshot specified by "scan.snapshot-id" on the table upon first startup, and continuously reads changes. For batch sources, produces a snapshot specified by "scan.snapshot-id" but does not read new changes.
  • "from-tag": This mode is only for batch sources. It produces a snapshot based on the given tag.
+ Specify the scanning behavior of the source.

Possible values:
  • "default": Determines actual startup mode according to other table properties. If "scan.timestamp-millis" is set the actual startup mode will be "from-timestamp", and if "scan.snapshot-id" or "scan.tag-name" is set the actual startup mode will be "from-snapshot". Otherwise the actual startup mode will be "latest-full".
  • "latest-full": For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes. For batch sources, just produce the latest snapshot but does not read new changes.
  • "full": Deprecated. Same as "latest-full".
  • "latest": For streaming sources, continuously reads latest changes without producing a snapshot at the beginning. For batch sources, behaves the same as the "latest-full" startup mode.
  • "compacted-full": For streaming sources, produces a snapshot after the latest compaction on the table upon first startup, and continue to read the latest changes. For batch sources, just produce a snapshot after the latest compaction but does not read new changes. Snapshots of full compaction are picked when scheduled full-compaction is enabled.
  • "from-timestamp": For streaming sources, continuously reads changes starting from timestamp specified by "scan.timestamp-millis", without producing a snapshot at the beginning. For batch sources, produces a snapshot at timestamp specified by "scan.timestamp-millis" but does not read new changes.
  • "from-snapshot": For streaming sources, continuously reads changes starting from snapshot specified by "scan.snapshot-id", without producing a snapshot at the beginning. For batch sources, produces a snapshot specified by "scan.snapshot-id" or "scan.tag-name" but does not read new changes.
  • "from-snapshot-full": For streaming sources, produces from snapshot specified by "scan.snapshot-id" on the table upon first startup, and continuously reads changes. For batch sources, produces a snapshot specified by "scan.snapshot-id" but does not read new changes.
scan.plan-sort-partition
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java index 7d4a93ec7db4..c08d6e9baca6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java @@ -1018,9 +1018,8 @@ public enum StartupMode implements DescribedEnum { DEFAULT( "default", "Determines actual startup mode according to other table properties. " - + "If \"scan.timestamp-millis\" is set the actual startup mode will be \"from-timestamp\"; " - + "if \"scan.snapshot-id\" is set the actual startup mode will be \"from-snapshot\"; " - + "and if \"scan.tag-name\" ise set the actual startup mode will be \"from-tag\". " + + "If \"scan.timestamp-millis\" is set the actual startup mode will be \"from-timestamp\", " + + "and if \"scan.snapshot-id\" or \"scan.tag-name\" is set the actual startup mode will be \"from-snapshot\". " + "Otherwise the actual startup mode will be \"latest-full\"."), LATEST_FULL( From 53b8611ebfee84ccc748969baca7445c697aaedd Mon Sep 17 00:00:00 2001 From: yuzelin Date: Wed, 21 Jun 2023 15:30:33 +0800 Subject: [PATCH 4/6] [fix] fix failed test --- .../java/org/apache/paimon/operation/AbstractFileStoreScan.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index fd87f36f4efa..836e386caeaf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -143,7 +143,6 @@ public FileStoreScan withPartitionBucket(BinaryRow partition, int bucket) { @Override public FileStoreScan withSnapshot(long snapshotId) { - checkState(specifiedSnapshot == null, "Snapshot has been set."); checkState(specifiedManifests == null, "Cannot set both snapshot and manifests."); this.specifiedSnapshot = snapshotManager.snapshot(snapshotId); return this; @@ -151,7 +150,6 @@ public FileStoreScan withSnapshot(long snapshotId) { @Override public FileStoreScan withSnapshot(Snapshot snapshot) { - checkState(specifiedSnapshot == null, "Snapshot has been set."); checkState(specifiedManifests == null, "Cannot set both snapshot and manifests."); this.specifiedSnapshot = snapshot; return this; From e820d45bb84fddc4156d51ed3d729662956464e4 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Wed, 21 Jun 2023 16:07:47 +0800 Subject: [PATCH 5/6] [fix] fix --- .../java/org/apache/paimon/flink/BatchFileStoreITCase.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index b7b9b0eccc65..88eb01619a3c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -122,9 +122,7 @@ public void testTimeTravelRead() throws Exception { time3))) .hasRootCauseInstanceOf(IllegalArgumentException.class) .hasRootCauseMessage( - "%s must be null when you set %s", - CoreOptions.SCAN_SNAPSHOT_ID.key(), - CoreOptions.SCAN_TIMESTAMP_MILLIS.key()); + "[scan.snapshot-id,scan.tag-name] must be null when you set [scan.timestamp-millis]"); assertThatThrownBy( () -> From f612ec281c125bc3d6a3be3045cb327fa2f10942 Mon Sep 17 00:00:00 2001 From: yuzelin Date: Wed, 21 Jun 2023 17:24:15 +0800 Subject: [PATCH 6/6] [fix] fix --- .../paimon/flink/BatchFileStoreITCase.java | 21 +++++++++++++++---- .../paimon/flink/CatalogITCaseBase.java | 17 +++++++-------- .../paimon/flink/BatchFileStoreITCase.java | 21 +++++++++++++++---- .../paimon/flink/CatalogITCaseBase.java | 17 +++++++-------- 4 files changed, 48 insertions(+), 28 deletions(-) diff --git a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 59cc98ce4d16..8f601761479d 100644 --- a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -52,7 +52,7 @@ public void testOverwriteEmpty() { } @Test - public void testTimeTravelRead() throws InterruptedException { + public void testTimeTravelRead() throws Exception { batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)"); long time1 = System.currentTimeMillis(); @@ -67,6 +67,8 @@ public void testTimeTravelRead() throws InterruptedException { Thread.sleep(10); batchSql("INSERT INTO T VALUES (7, 77, 777), (8, 88, 888)"); + paimonTable("T").createTag("tag2", 2); + assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='1') */")) .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222)); @@ -125,9 +127,7 @@ public void testTimeTravelRead() throws InterruptedException { time3))) .hasRootCauseInstanceOf(IllegalArgumentException.class) .hasRootCauseMessage( - "%s must be null when you set %s", - CoreOptions.SCAN_SNAPSHOT_ID.key(), - CoreOptions.SCAN_TIMESTAMP_MILLIS.key()); + "[scan.snapshot-id,scan.tag-name] must be null when you set [scan.timestamp-millis]"); assertThatThrownBy( () -> @@ -137,5 +137,18 @@ public void testTimeTravelRead() throws InterruptedException { .hasRootCauseMessage( "%s must be null when you use latest-full for scan.mode", CoreOptions.SCAN_SNAPSHOT_ID.key()); + + // travel to tag + assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='tag2') */")) + .containsExactlyInAnyOrder( + Row.of(1, 11, 111), + Row.of(2, 22, 222), + Row.of(3, 33, 333), + Row.of(4, 44, 444)); + + assertThatThrownBy( + () -> batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='unknown') */")) + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage("Tag 'unknown' doesn't exist."); } } diff --git a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java index 0f656904074d..d14e0649508f 100644 --- a/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java +++ b/paimon-flink/paimon-flink-1.14/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java @@ -19,8 +19,10 @@ package org.apache.paimon.flink; import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.table.Table; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; @@ -29,11 +31,6 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentImpl; -import org.apache.flink.table.catalog.Catalog; -import org.apache.flink.table.catalog.CatalogBaseTable; -import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateCatalogOperation; @@ -127,11 +124,11 @@ protected CloseableIterator streamSqlIter(String query, Object... args) { return sEnv.executeSql(String.format(query, args)).collect(); } - protected CatalogTable table(String tableName) throws TableNotExistException { - Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get(); - CatalogBaseTable table = - catalog.getTable(new ObjectPath(catalog.getDefaultDatabase(), tableName)); - return (CatalogTable) table; + protected Table paimonTable(String tableName) + throws org.apache.paimon.catalog.Catalog.TableNotExistException { + FlinkCatalog flinkCatalog = (FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get(); + org.apache.paimon.catalog.Catalog catalog = flinkCatalog.catalog(); + return catalog.getTable(Identifier.create(tEnv.getCurrentDatabase(), tableName)); } protected Path getTableDirectory(String tableName) { diff --git a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index 59cc98ce4d16..8f601761479d 100644 --- a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -52,7 +52,7 @@ public void testOverwriteEmpty() { } @Test - public void testTimeTravelRead() throws InterruptedException { + public void testTimeTravelRead() throws Exception { batchSql("INSERT INTO T VALUES (1, 11, 111), (2, 22, 222)"); long time1 = System.currentTimeMillis(); @@ -67,6 +67,8 @@ public void testTimeTravelRead() throws InterruptedException { Thread.sleep(10); batchSql("INSERT INTO T VALUES (7, 77, 777), (8, 88, 888)"); + paimonTable("T").createTag("tag2", 2); + assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.snapshot-id'='1') */")) .containsExactlyInAnyOrder(Row.of(1, 11, 111), Row.of(2, 22, 222)); @@ -125,9 +127,7 @@ public void testTimeTravelRead() throws InterruptedException { time3))) .hasRootCauseInstanceOf(IllegalArgumentException.class) .hasRootCauseMessage( - "%s must be null when you set %s", - CoreOptions.SCAN_SNAPSHOT_ID.key(), - CoreOptions.SCAN_TIMESTAMP_MILLIS.key()); + "[scan.snapshot-id,scan.tag-name] must be null when you set [scan.timestamp-millis]"); assertThatThrownBy( () -> @@ -137,5 +137,18 @@ public void testTimeTravelRead() throws InterruptedException { .hasRootCauseMessage( "%s must be null when you use latest-full for scan.mode", CoreOptions.SCAN_SNAPSHOT_ID.key()); + + // travel to tag + assertThat(batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='tag2') */")) + .containsExactlyInAnyOrder( + Row.of(1, 11, 111), + Row.of(2, 22, 222), + Row.of(3, 33, 333), + Row.of(4, 44, 444)); + + assertThatThrownBy( + () -> batchSql("SELECT * FROM T /*+ OPTIONS('scan.tag-name'='unknown') */")) + .hasRootCauseInstanceOf(IllegalArgumentException.class) + .hasRootCauseMessage("Tag 'unknown' doesn't exist."); } } diff --git a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java index 0f656904074d..d14e0649508f 100644 --- a/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java +++ b/paimon-flink/paimon-flink-1.15/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java @@ -19,8 +19,10 @@ package org.apache.paimon.flink; import org.apache.paimon.Snapshot; +import org.apache.paimon.catalog.Identifier; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.table.Table; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; @@ -29,11 +31,6 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.internal.TableEnvironmentImpl; -import org.apache.flink.table.catalog.Catalog; -import org.apache.flink.table.catalog.CatalogBaseTable; -import org.apache.flink.table.catalog.CatalogTable; -import org.apache.flink.table.catalog.ObjectPath; -import org.apache.flink.table.catalog.exceptions.TableNotExistException; import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.operations.Operation; import org.apache.flink.table.operations.ddl.CreateCatalogOperation; @@ -127,11 +124,11 @@ protected CloseableIterator streamSqlIter(String query, Object... args) { return sEnv.executeSql(String.format(query, args)).collect(); } - protected CatalogTable table(String tableName) throws TableNotExistException { - Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get(); - CatalogBaseTable table = - catalog.getTable(new ObjectPath(catalog.getDefaultDatabase(), tableName)); - return (CatalogTable) table; + protected Table paimonTable(String tableName) + throws org.apache.paimon.catalog.Catalog.TableNotExistException { + FlinkCatalog flinkCatalog = (FlinkCatalog) tEnv.getCatalog(tEnv.getCurrentCatalog()).get(); + org.apache.paimon.catalog.Catalog catalog = flinkCatalog.catalog(); + return catalog.getTable(Identifier.create(tEnv.getCurrentDatabase(), tableName)); } protected Path getTableDirectory(String tableName) {