diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/SupplierWithIOException.java b/paimon-common/src/main/java/org/apache/paimon/utils/SupplierWithIOException.java new file mode 100644 index 000000000000..f5c79817e413 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/utils/SupplierWithIOException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import java.io.IOException; +import java.util.function.Supplier; + +/** A {@link Supplier} with {@link IOException}. */ +@FunctionalInterface +public interface SupplierWithIOException { + + T get() throws IOException; +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index fc2e1200f08d..c2b9be4c2725 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -37,6 +37,7 @@ import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SerializableConsumer; import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.SupplierWithIOException; import org.apache.paimon.utils.TagManager; import org.slf4j.Logger; @@ -322,13 +323,13 @@ protected List tryBestListingDirs(Path dir) { * {@link FileNotFoundException}, return default value. Finally, if retry times reaches the * limits, rethrow the IOException. */ - protected static T retryReadingFiles(ReaderWithIOException reader, T defaultValue) + protected static T retryReadingFiles(SupplierWithIOException reader, T defaultValue) throws IOException { int retryNumber = 0; IOException caught = null; while (retryNumber++ < READ_FILE_RETRY_NUM) { try { - return reader.read(); + return reader.get(); } catch (FileNotFoundException e) { return defaultValue; } catch (IOException e) { @@ -349,13 +350,6 @@ protected boolean oldEnough(FileStatus status) { return status.getModificationTime() < olderThanMillis; } - /** A helper functional interface for method {@link #retryReadingFiles}. */ - @FunctionalInterface - protected interface ReaderWithIOException { - - T read() throws IOException; - } - public static SerializableConsumer createFileCleaner( Catalog catalog, @Nullable Boolean dryRun) { SerializableConsumer fileCleaner; diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 7e008698c4fd..935469a8196d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -541,7 +541,7 @@ private Optional travelToVersion(String version, Options options) { } private Optional travelToTag(String tagName, Options options) { - return travelToSnapshot(tagManager().taggedSnapshot(tagName), options); + return travelToSnapshot(tagManager().getOrThrow(tagName).trimToSnapshot(), options); } private Optional travelToSnapshot(long snapshotId, Options options) { @@ -633,7 +633,9 @@ public void createTag(String tagName, Duration timeRetained) { } private void createTag(String tagName, Snapshot fromSnapshot, @Nullable Duration timeRetained) { - tagManager().createTag(fromSnapshot, tagName, timeRetained, store().createTagCallbacks()); + tagManager() + .createTag( + fromSnapshot, tagName, timeRetained, store().createTagCallbacks(), false); } @Override @@ -689,7 +691,7 @@ public void rollbackTo(String tagName) { TagManager tagManager = tagManager(); checkArgument(tagManager.tagExists(tagName), "Rollback tag '%s' doesn't exist.", tagName); - Snapshot taggedSnapshot = tagManager.taggedSnapshot(tagName); + Snapshot taggedSnapshot = tagManager.getOrThrow(tagName).trimToSnapshot(); rollbackHelper().cleanLargerThan(taggedSnapshot); try { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java index 2cdf5bff9d26..e08ac9f44c60 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/IncrementalTagStartingScanner.java @@ -25,36 +25,27 @@ /** {@link StartingScanner} for incremental changes by tag. */ public class IncrementalTagStartingScanner extends AbstractStartingScanner { - private final String start; - private final String end; + private final Snapshot start; + private final Snapshot end; public IncrementalTagStartingScanner( - SnapshotManager snapshotManager, String start, String end) { + SnapshotManager snapshotManager, String startTagName, String endTagName) { super(snapshotManager); - this.start = start; - this.end = end; TagManager tagManager = new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); - Snapshot startingSnapshot = tagManager.taggedSnapshot(start); - if (startingSnapshot != null) { - this.startingSnapshotId = startingSnapshot.id(); - } - } - - @Override - public Result scan(SnapshotReader reader) { - TagManager tagManager = - new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); - Snapshot tag1 = tagManager.taggedSnapshot(start); - Snapshot tag2 = tagManager.taggedSnapshot(end); - - if (tag2.id() <= tag1.id()) { + start = tagManager.getOrThrow(startTagName).trimToSnapshot(); + end = tagManager.getOrThrow(endTagName).trimToSnapshot(); + if (end.id() <= start.id()) { throw new IllegalArgumentException( String.format( "Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s", - end, tag2.id(), start, tag1.id())); + endTagName, end.id(), startTagName, start.id())); } + this.startingSnapshotId = start.id(); + } - return StartingScanner.fromPlan(reader.withSnapshot(tag2).readIncrementalDiff(tag1)); + @Override + public Result scan(SnapshotReader reader) { + return StartingScanner.fromPlan(reader.withSnapshot(end).readIncrementalDiff(start)); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java index 4fa070299f4f..b22e17e9a026 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/StaticFromTagStartingScanner.java @@ -43,7 +43,7 @@ public ScanMode startingScanMode() { public SnapshotReader configure(SnapshotReader snapshotReader) { TagManager tagManager = new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); - Snapshot snapshot = tagManager.taggedSnapshot(tagName); + Snapshot snapshot = tagManager.getOrThrow(tagName).trimToSnapshot(); return snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshot); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java index 4c8b41aa4215..5b4ee4e58c50 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/TimeTravelUtil.java @@ -122,6 +122,6 @@ private static Snapshot resolveSnapshotByTagName( String tagName = options.scanTagName(); TagManager tagManager = new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath()); - return tagManager.taggedSnapshot(tagName); + return tagManager.getOrThrow(tagName).trimToSnapshot(); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java index 9aafdb5983fd..8f28be8af2db 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/TagsTable.java @@ -59,6 +59,7 @@ import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -227,27 +228,25 @@ public RecordReader createReader(Split split) { && ((LeafPredicate) predicate).literals().get(0) instanceof BinaryString && predicate.visit(LeafPredicateExtractor.INSTANCE).get(TAG_NAME) != null) { String equalValue = ((LeafPredicate) predicate).literals().get(0).toString(); - if (tagManager.tagExists(equalValue)) { - predicateMap.put(equalValue, tagManager.tag(equalValue)); - } + tagManager.get(equalValue).ifPresent(tag -> predicateMap.put(equalValue, tag)); } if (predicate instanceof CompoundPredicate) { CompoundPredicate compoundPredicate = (CompoundPredicate) predicate; // optimize for IN filter if ((compoundPredicate.function()) instanceof Or) { + List tagNames = new ArrayList<>(); InPredicateVisitor.extractInElements(predicate, TAG_NAME) .ifPresent( - leafs -> - leafs.forEach( - leaf -> { - String leftName = leaf.toString(); - if (tagManager.tagExists(leftName)) { - predicateMap.put( - leftName, - tagManager.tag(leftName)); - } - })); + e -> + e.stream() + .map(Object::toString) + .forEach(tagNames::add)); + tagNames.forEach( + name -> + tagManager + .get(name) + .ifPresent(value -> predicateMap.put(name, value))); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java index 3989786bd277..5232e89a8b5e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java +++ b/paimon-core/src/main/java/org/apache/paimon/tag/TagAutoCreation.java @@ -167,9 +167,8 @@ private void tryToCreateTags(Snapshot snapshot) { } String tagName = periodHandler.timeToTag(thisTag); LOG.info("The tag name is {}.", tagName); - if (!tagManager.tagExists(tagName)) { - tagManager.createTag(snapshot, tagName, defaultTimeRetained, callbacks); - } + // shouldn't throw exception when tag exists + tagManager.createTag(snapshot, tagName, defaultTimeRetained, callbacks, true); nextTag = periodHandler.nextTagTime(thisTag); LOG.info("The next tag time after this is {}.", nextTag); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 2ea5f542f4e5..7dfb30e69801 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -107,9 +107,7 @@ branchName, branchPath(tablePath, branchName)), public void createBranch(String branchName, String tagName) { validateBranch(branchName); - checkArgument(tagManager.tagExists(tagName), "Tag name '%s' not exists.", tagName); - - Snapshot snapshot = tagManager.taggedSnapshot(tagName); + Snapshot snapshot = tagManager.getOrThrow(tagName).trimToSnapshot(); try { // Copy the corresponding tag, snapshot and schema files into the branch directory diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 4019395d8d65..4713703bbd80 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -41,6 +41,7 @@ import java.util.Collection; import java.util.Comparator; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -97,31 +98,34 @@ public List tagPaths(Predicate predicate) throws IOException { /** Create a tag from given snapshot and save it in the storage. */ public void createTag( - Snapshot snapshot, String tagName, Duration timeRetained, List callbacks) { - checkArgument( - !StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' is blank.", tagName); - checkArgument(!tagExists(tagName), "Tag name '%s' already exists.", tagName); + Snapshot snapshot, + String tagName, + Duration timeRetained, + List callbacks, + boolean ignoreIfExists) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name shouldn't be blank."); + if (tagExists(tagName)) { + checkArgument(ignoreIfExists, "Tag '%s' already exists.", tagName); + return; + } createOrReplaceTag(snapshot, tagName, timeRetained, callbacks); } /** Replace a tag from given snapshot and save it in the storage. */ public void replaceTag(Snapshot snapshot, String tagName, Duration timeRetained) { - checkArgument( - !StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' is blank.", tagName); - checkArgument(tagExists(tagName), "Tag name '%s' does not exist.", tagName); + checkArgument(!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name shouldn't be blank."); + checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName); createOrReplaceTag(snapshot, tagName, timeRetained, null); } - public void createOrReplaceTag( + private void createOrReplaceTag( Snapshot snapshot, String tagName, @Nullable Duration timeRetained, @Nullable List callbacks) { - // When timeRetained is not defined, please do not write the tagCreatorTime field, - // as this will cause older versions (<= 0.7) of readers to be unable to read this - // tag. - // When timeRetained is defined, it is fine, because timeRetained is the new - // feature. + // When timeRetained is not defined, please do not write the tagCreatorTime field, as this + // will cause older versions (<= 0.7) of readers to be unable to read this tag. + // When timeRetained is defined, it is fine, because timeRetained is the new feature. String content = timeRetained != null ? Tag.fromSnapshotAndTagTtl(snapshot, timeRetained, LocalDateTime.now()) @@ -152,17 +156,17 @@ public void createOrReplaceTag( } public void renameTag(String tagName, String targetTagName) { + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(tagName), + "Original tag name shouldn't be blank."); + checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName); + + checkArgument( + !StringUtils.isNullOrWhitespaceOnly(targetTagName), + "New tag name shouldn't be blank."); + checkArgument(!tagExists(targetTagName), "Tag '%s' already exists.", tagName); + try { - if (!tagExists(tagName)) { - throw new RuntimeException( - String.format("The specified tag name [%s] does not exist.", tagName)); - } - if (tagExists(targetTagName)) { - throw new RuntimeException( - String.format( - "The specified target tag name [%s] existed, please set a non-existent tag name.", - targetTagName)); - } fileIO.rename(tagPath(tagName), tagPath(targetTagName)); } catch (IOException e) { throw new RuntimeException(e); @@ -172,7 +176,7 @@ public void renameTag(String tagName, String targetTagName) { /** Make sure the tagNames are ALL tags of one snapshot. */ public void deleteAllTagsOfOneSnapshot( List tagNames, TagDeletion tagDeletion, SnapshotManager snapshotManager) { - Snapshot taggedSnapshot = taggedSnapshot(tagNames.get(0)); + Snapshot taggedSnapshot = getOrThrow(tagNames.get(0)).trimToSnapshot(); List taggedSnapshots; // skip file deletion if snapshot exists @@ -188,19 +192,20 @@ public void deleteAllTagsOfOneSnapshot( doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion); } + /** Ignore errors if the tag doesn't exist. */ public void deleteTag( String tagName, TagDeletion tagDeletion, SnapshotManager snapshotManager, List callbacks) { - checkArgument( - !StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' is blank.", tagName); - if (!tagExists(tagName)) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name shouldn't be blank."); + Optional tag = get(tagName); + if (!tag.isPresent()) { LOG.warn("Tag '{}' doesn't exist.", tagName); return; } - Snapshot taggedSnapshot = taggedSnapshot(tagName); + Snapshot taggedSnapshot = tag.get().trimToSnapshot(); List taggedSnapshots; // skip file deletion if snapshot exists @@ -303,10 +308,21 @@ public boolean tagExists(String tagName) { } } - /** Get the tagged snapshot by name. */ - public Snapshot taggedSnapshot(String tagName) { - checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName); - return Tag.fromPath(fileIO, tagPath(tagName)).trimToSnapshot(); + /** Return the tag or Optional.empty() if the tag file not found. */ + public Optional get(String tagName) { + checkArgument(!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name shouldn't be blank."); + try { + return Optional.of(Tag.tryFromPath(fileIO, tagPath(tagName))); + } catch (FileNotFoundException e) { + return Optional.empty(); + } + } + + /** Return the tag or throw exception indicating the tag not found. */ + public Tag getOrThrow(String tagName) { + return get(tagName) + .orElseThrow( + () -> new IllegalArgumentException("Tag '" + tagName + "' doesn't exist.")); } public long tagCount() { @@ -410,12 +426,7 @@ private int findIndex(Snapshot taggedSnapshot, List taggedSnapshots) { } throw new RuntimeException( String.format( - "Didn't find tag with snapshot id '%s'.This is unexpected.", + "Didn't find tag with snapshot id '%s'. This is unexpected.", taggedSnapshot.id())); } - - /** Read tag for tagName. */ - public Tag tag(String tagName) { - return Tag.fromPath(fileIO, tagPath(tagName)); - } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index abff820b2cb4..5811fc37216b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -151,7 +151,8 @@ public void testMixedSnapshotAndTagDeletion() throws Exception { snapshot, "tag" + id, store.options().tagDefaultTimeRetained(), - Collections.emptyList()); + Collections.emptyList(), + false); } // randomly expire snapshots diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java index 3a5ee93daa37..45b53ddba441 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java @@ -313,7 +313,7 @@ public void testExpireWithExistingTags() throws Exception { // check manifests ManifestList manifestList = store.manifestListFactory().create(); for (String tagName : Arrays.asList("tag1", "tag2")) { - Snapshot snapshot = tagManager.taggedSnapshot(tagName); + Snapshot snapshot = tagManager.getOrThrow(tagName); List manifestFilePaths = manifestList.readDataManifests(snapshot).stream() .map(ManifestFileMeta::fileName) @@ -367,7 +367,7 @@ public void testExpireWithUpgradeAndTags() throws Exception { FileStorePathFactory pathFactory = store.pathFactory(); assertPathExists(fileIO, pathFactory.bucketPath(partition, 0)); - Snapshot tag1 = tagManager.taggedSnapshot("tag1"); + Snapshot tag1 = tagManager.getOrThrow("tag1"); ManifestList manifestList = store.manifestListFactory().create(); List manifestFilePaths = manifestList.readDataManifests(tag1).stream() @@ -519,8 +519,8 @@ public void testDeleteTagWithOtherTag() throws Exception { assertPathNotExists(fileIO, pathFactory.bucketPath(partition, 1)); // check manifests - Snapshot tag1 = tagManager.taggedSnapshot("tag1"); - Snapshot tag3 = tagManager.taggedSnapshot("tag3"); + Snapshot tag1 = tagManager.getOrThrow("tag1"); + Snapshot tag3 = tagManager.getOrThrow("tag3"); List existing = manifestList.readDataManifests(tag1); existing.addAll(manifestList.readDataManifests(tag3)); for (ManifestFileMeta manifestFileMeta : snapshot2Data) { @@ -805,6 +805,6 @@ private void cleanBucket(TestFileStore store, BinaryRow partition, int bucket) { } private void createTag(Snapshot snapshot, String tagName, Duration timeRetained) { - tagManager.createTag(snapshot, tagName, timeRetained, Collections.emptyList()); + tagManager.createTag(snapshot, tagName, timeRetained, Collections.emptyList(), false); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index 75e284a68c3a..ecb42d766901 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -1080,7 +1080,7 @@ public void testCreateTag() throws Exception { assertThat(tagManager.tagExists("test-tag")).isTrue(); // verify that test-tag is equal to snapshot 2 - Snapshot tagged = tagManager.taggedSnapshot("test-tag"); + Snapshot tagged = tagManager.getOrThrow("test-tag").trimToSnapshot(); Snapshot snapshot2 = table.snapshotManager().snapshot(2); assertThat(tagged.equals(snapshot2)).isTrue(); } @@ -1103,7 +1103,7 @@ public void testCreateTagOnExpiredSnapshot() throws Exception { TagManager tagManager = new TagManager(new TraceableFileIO(), tablePath); assertThat(tagManager.tagExists("test-tag")).isTrue(); // verify that test-tag is equal to snapshot 1 - Snapshot tagged = tagManager.taggedSnapshot("test-tag"); + Snapshot tagged = tagManager.getOrThrow("test-tag").trimToSnapshot(); Snapshot snapshot1 = table.snapshotManager().snapshot(1); assertThat(tagged.equals(snapshot1)).isTrue(); // snapshot 2 @@ -1116,7 +1116,7 @@ public void testCreateTagOnExpiredSnapshot() throws Exception { // verify that tag file exist assertThat(tagManager.tagExists("test-tag-2")).isTrue(); // verify that test-tag is equal to snapshot 1 - Snapshot tag2 = tagManager.taggedSnapshot("test-tag-2"); + Snapshot tag2 = tagManager.getOrThrow("test-tag-2").trimToSnapshot(); assertThat(tag2.equals(snapshot1)).isTrue(); } } @@ -1138,9 +1138,9 @@ public void testCreateSameTagName() throws Exception { assertThat(tagManager.tagExists("test-tag")).isTrue(); // Create again failed if tag existed Assertions.assertThatThrownBy(() -> table.createTag("test-tag", 1)) - .hasMessageContaining("Tag name 'test-tag' already exists."); + .hasMessageContaining("Tag 'test-tag' already exists."); Assertions.assertThatThrownBy(() -> table.createTag("test-tag", 2)) - .hasMessageContaining("Tag name 'test-tag' already exists."); + .hasMessageContaining("Tag 'test-tag' already exists."); } } @@ -1165,7 +1165,7 @@ public void testCreateBranch() throws Exception { assertThat(tagManager.tagExists("test-tag")).isTrue(); // verify that test-tag is equal to snapshot 2 - Snapshot tagged = tagManager.taggedSnapshot("test-tag"); + Snapshot tagged = tagManager.getOrThrow("test-tag").trimToSnapshot(); Snapshot snapshot2 = table.snapshotManager().snapshot(2); assertThat(tagged.equals(snapshot2)).isTrue(); @@ -1220,7 +1220,7 @@ public void testUnsupportedBranchName() throws Exception { assertThatThrownBy(() -> table.createBranch("branch-1", "tag1")) .satisfies( anyCauseMatches( - IllegalArgumentException.class, "Tag name 'tag1' not exists.")); + IllegalArgumentException.class, "Tag 'tag1' doesn't exist.")); assertThatThrownBy(() -> table.createBranch("branch0", "test-tag")) .satisfies( @@ -1409,8 +1409,7 @@ public void testUnsupportedTagName() throws Exception { assertThatThrownBy(() -> table.createTag("", 1)) .satisfies( anyCauseMatches( - IllegalArgumentException.class, - String.format("Tag name '%s' is blank", ""))); + IllegalArgumentException.class, "Tag name shouldn't be blank")); } @Test diff --git a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java index 4ad634a43351..3040e1117521 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java @@ -107,8 +107,8 @@ public void testIndexFileExpirationWithTag() throws Exception { assertThat(indexManifestSize()).isEqualTo(3); TagManager tagManager = new TagManager(LocalFileIO.create(), table.location()); - checkIndexFiles(tagManager.taggedSnapshot("tag3")); - checkIndexFiles(tagManager.taggedSnapshot("tag5")); + checkIndexFiles(tagManager.getOrThrow("tag3")); + checkIndexFiles(tagManager.getOrThrow("tag5")); } @Test @@ -135,7 +135,7 @@ public void testIndexFileExpirationWhenDeletingTag() throws Exception { TagManager tagManager = new TagManager(LocalFileIO.create(), table.location()); checkIndexFiles(7); - checkIndexFiles(tagManager.taggedSnapshot("tag5")); + checkIndexFiles(tagManager.getOrThrow("tag5")); assertThat(indexFileSize()).isEqualTo(4); assertThat(indexManifestSize()).isEqualTo(2); } diff --git a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java index 1bebbe5fb592..4dfa802f8122 100644 --- a/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/tag/TagAutoManagerTest.java @@ -392,7 +392,8 @@ public void testExpireTagsByTimeRetained() throws Exception { snapshot1, "non-auto-create-tag-shoule-expire", Duration.ofMillis(500), - Collections.emptyList()); + Collections.emptyList(), + false); Snapshot snapshot2 = new Snapshot( @@ -416,7 +417,8 @@ public void testExpireTagsByTimeRetained() throws Exception { snapshot2, "non-auto-create-tag-shoule-not-expire", Duration.ofDays(1), - Collections.emptyList()); + Collections.emptyList(), + false); // test expire old tag by time-retained Thread.sleep(1000); diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java index 3e702b9b2cd0..abe9ee0b8a20 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/TagManagerTest.java @@ -91,9 +91,10 @@ public void testCreateTagWithoutTimeRetained() throws Exception { snapshotManager.snapshot(1), "tag", store.options().tagDefaultTimeRetained(), - Collections.emptyList()); + Collections.emptyList(), + false); assertThat(tagManager.tagExists("tag")).isTrue(); - Snapshot snapshot = tagManager.taggedSnapshot("tag"); + Snapshot snapshot = tagManager.getOrThrow("tag").trimToSnapshot(); String snapshotJson = snapshot.toJson(); Assertions.assertTrue( !snapshotJson.contains("tagCreateTime") @@ -119,7 +120,11 @@ public void testCreateTagWithTimeRetained() throws Exception { commitData(store, commitIdentifier++, writers); tagManager.createTag( - snapshotManager.snapshot(1), "tag", Duration.ofDays(1), Collections.emptyList()); + snapshotManager.snapshot(1), + "tag", + Duration.ofDays(1), + Collections.emptyList(), + false); assertThat(tagManager.tagExists("tag")).isTrue(); List> tags = tagManager.tagObjects(); Assertions.assertEquals(1, tags.size()); diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/TraceableFileIO.java b/paimon-core/src/test/java/org/apache/paimon/utils/TraceableFileIO.java index eb616a9ab294..ef811bb712f7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/TraceableFileIO.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/TraceableFileIO.java @@ -37,7 +37,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; -import java.util.function.Supplier; import java.util.stream.Collectors; import static org.apache.paimon.utils.Preconditions.checkNotNull; @@ -62,15 +61,7 @@ public class TraceableFileIO implements FileIO { @Override public PositionOutputStream newOutputStream(Path f, boolean overwrite) throws IOException { - return createOutputStream( - f, - () -> { - try { - return originalFs.newOutputStream(f, overwrite); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + return createOutputStream(f, () -> originalFs.newOutputStream(f, overwrite)); } @Override @@ -85,30 +76,22 @@ public void configure(CatalogContext context) { @Override public SeekableInputStream newInputStream(Path f) throws IOException { - return createInputStream( - f, - () -> { - try { - return originalFs.newInputStream(f); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); + return createInputStream(f, () -> originalFs.newInputStream(f)); } private PositionOutputStream createOutputStream( - Path f, Supplier streamOpener) throws IOException { + Path f, SupplierWithIOException streamOpener) throws IOException { - final Supplier wrappedStreamOpener = + final SupplierWithIOException wrappedStreamOpener = () -> new OutStream(ThreadUtils.currentStackString(), f, streamOpener.get(), this); return createStream(wrappedStreamOpener, OPEN_OUTPUT_STREAMS); } private SeekableInputStream createInputStream( - Path f, Supplier streamOpener) throws IOException { + Path f, SupplierWithIOException streamOpener) throws IOException { - final Supplier wrappedStreamOpener = + final SupplierWithIOException wrappedStreamOpener = () -> new InStream(ThreadUtils.currentStackString(), f, streamOpener.get(), this); return createStream(wrappedStreamOpener, OPEN_INPUT_STREAMS); @@ -144,7 +127,8 @@ public boolean exists(Path f) throws IOException { return originalFs.exists(f); } - private T createStream(final Supplier streamOpener, final HashSet openStreams) + private T createStream( + final SupplierWithIOException streamOpener, final HashSet openStreams) throws IOException { // open the stream outside the lock. final T out = streamOpener.get(); diff --git a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java index f79d6fb716b4..e4d1738667ff 100644 --- a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java +++ b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/procedure/ProcedurePositionalArgumentsITCase.java @@ -540,7 +540,7 @@ public void testReplaceTags() throws Exception { assertThat(paimonTable("T").snapshotManager().snapshotCount()).isEqualTo(2L); Assertions.assertThatThrownBy(() -> sql("CALL sys.replace_tag('default.T', 'test_tag')")) - .hasMessageContaining("Tag name 'test_tag' does not exist."); + .hasMessageContaining("Tag 'test_tag' doesn't exist."); sql("CALL sys.create_tag('default.T', 'test_tag')"); assertThat(sql("select tag_name,snapshot_id,time_retained from `T$tags`")) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java index 9de974d047f1..c36a6cd18668 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java @@ -31,6 +31,7 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.SupplierWithIOException; import javax.annotation.Nullable; @@ -176,12 +177,12 @@ private static List readAllManifestsWithIOException( } @Nullable - private static T retryReadingFiles(ReaderWithIOException reader) throws IOException { + private static T retryReadingFiles(SupplierWithIOException reader) throws IOException { int retryNumber = 0; IOException caught = null; while (retryNumber++ < READ_FILE_RETRY_NUM) { try { - return reader.read(); + return reader.get(); } catch (FileNotFoundException e) { return null; } catch (IOException e) { @@ -197,10 +198,4 @@ private static T retryReadingFiles(ReaderWithIOException reader) throws I throw caught; } - - /** A helper functional interface for method {@link #retryReadingFiles}. */ - @FunctionalInterface - interface ReaderWithIOException { - T read() throws IOException; - } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java index 0822f0461241..66d9781207a6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AutoTagForSavepointCommitterOperator.java @@ -164,9 +164,8 @@ private void createTagForIdentifiers(List identifiers) { commitOperator.getCommitUser(), identifiers); for (Snapshot snapshot : snapshotForTags) { String tagName = SAVEPOINT_TAG_PREFIX + snapshot.commitIdentifier(); - if (!tagManager.tagExists(tagName)) { - tagManager.createTag(snapshot, tagName, tagTimeRetained, callbacks); - } + // shouldn't throw exception when tag exists + tagManager.createTag(snapshot, tagName, tagTimeRetained, callbacks, true); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java index 3f2ea76ffec9..d1ae093b2fee 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperator.java @@ -108,16 +108,15 @@ private void createTag() { + localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd")); try { // If the tag already exists, delete the tag - if (tagManager.tagExists(tagName)) { - tagManager.deleteTag( - tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks()); - } + tagManager.deleteTag( + tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks()); // Create a new tag tagManager.createTag( snapshot, tagName, table.coreOptions().tagDefaultTimeRetained(), - table.store().createTagCallbacks()); + table.store().createTagCallbacks(), + false); // Expire the tag expireTag(); } catch (Exception e) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionITTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionITTest.java index 1198eb47f4b9..ee42d2e177a7 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionITTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CreateTagFromWatermarkActionITTest.java @@ -80,8 +80,8 @@ public void testCreateTagsFromSnapshotsWatermark() throws Exception { Long.toString(watermark2 - 1)) .run(); assertThat(table.tagManager().tagExists("tag2")).isTrue(); - assertThat(table.tagManager().taggedSnapshot("tag2").watermark()).isEqualTo(watermark2); - assertThat(table.tagManager().taggedSnapshot("tag2").timeMillis()).isEqualTo(commitTime2); + assertThat(table.tagManager().getOrThrow("tag2").watermark()).isEqualTo(watermark2); + assertThat(table.tagManager().getOrThrow("tag2").timeMillis()).isEqualTo(commitTime2); createAction( CreateTagFromWatermarkAction.class, @@ -98,8 +98,8 @@ public void testCreateTagsFromSnapshotsWatermark() throws Exception { Long.toString(watermark2 + 1)) .run(); assertThat(table.tagManager().tagExists("tag3")).isTrue(); - assertThat(table.tagManager().taggedSnapshot("tag3").watermark()).isEqualTo(watermark3); - assertThat(table.tagManager().taggedSnapshot("tag3").timeMillis()).isEqualTo(commitTime3); + assertThat(table.tagManager().getOrThrow("tag3").watermark()).isEqualTo(watermark3); + assertThat(table.tagManager().getOrThrow("tag3").timeMillis()).isEqualTo(commitTime3); } @Test @@ -131,7 +131,7 @@ public void testCreateTagsFromTagsWatermark() throws Exception { assertThat(table.snapshotManager().snapshotExists(1)).isFalse(); - Snapshot tagSnapshot1 = table.tagManager().taggedSnapshot("tag1"); + Snapshot tagSnapshot1 = table.tagManager().getOrThrow("tag1"); long tagsCommitTime = tagSnapshot1.timeMillis(); long tagsWatermark = tagSnapshot1.watermark(); @@ -158,9 +158,8 @@ public void testCreateTagsFromTagsWatermark() throws Exception { Long.toString(tagsWatermark - 1)) .run(); assertThat(table.tagManager().tagExists("tag2")).isTrue(); - assertThat(table.tagManager().taggedSnapshot("tag2").watermark()).isEqualTo(tagsWatermark); - assertThat(table.tagManager().taggedSnapshot("tag2").timeMillis()) - .isEqualTo(tagsCommitTime); + assertThat(table.tagManager().getOrThrow("tag2").watermark()).isEqualTo(tagsWatermark); + assertThat(table.tagManager().getOrThrow("tag2").timeMillis()).isEqualTo(tagsCommitTime); createAction( CreateTagFromWatermarkAction.class, @@ -177,7 +176,7 @@ public void testCreateTagsFromTagsWatermark() throws Exception { Long.toString(watermark2 - 1)) .run(); assertThat(table.tagManager().tagExists("tag3")).isTrue(); - assertThat(table.tagManager().taggedSnapshot("tag3").watermark()).isEqualTo(watermark2); - assertThat(table.tagManager().taggedSnapshot("tag3").timeMillis()).isEqualTo(commitTime2); + assertThat(table.tagManager().getOrThrow("tag3").watermark()).isEqualTo(watermark2); + assertThat(table.tagManager().getOrThrow("tag3").timeMillis()).isEqualTo(commitTime2); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java index 7e204ca88492..23f2f0261d68 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java @@ -92,7 +92,7 @@ public void testExpireTags() throws Exception { assertThat(table.tagManager().tagExists("tag-5")).isFalse(); // tag-3 as the base older_than time - LocalDateTime olderThanTime = table.tagManager().tag("tag-3").getTagCreateTime(); + LocalDateTime olderThanTime = table.tagManager().getOrThrow("tag-3").getTagCreateTime(); java.sql.Timestamp timestamp = new java.sql.Timestamp(Timestamp.fromLocalDateTime(olderThanTime).getMillisecond()); createAction( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ReplaceTagActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ReplaceTagActionTest.java index 00b43b9e11c9..8b14afdd7d1a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ReplaceTagActionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ReplaceTagActionTest.java @@ -55,12 +55,12 @@ public void testReplaceTag() throws Exception { () -> bEnv.executeSql( "CALL sys.replace_tag(`table` => 'default.T', tag => 'test_tag')")) - .hasMessageContaining("Tag name 'test_tag' does not exist."); + .hasMessageContaining("Tag 'test_tag' doesn't exist."); bEnv.executeSql("CALL sys.create_tag(`table` => 'default.T', tag => 'test_tag')"); assertThat(tagManager.tagExists("test_tag")).isEqualTo(true); - assertThat(tagManager.tag("test_tag").trimToSnapshot().id()).isEqualTo(2); - assertThat(tagManager.tag("test_tag").getTagTimeRetained()).isEqualTo(null); + assertThat(tagManager.getOrThrow("test_tag").id()).isEqualTo(2); + assertThat(tagManager.getOrThrow("test_tag").getTagTimeRetained()).isEqualTo(null); // replace tag with new time_retained createAction( @@ -77,7 +77,7 @@ public void testReplaceTag() throws Exception { "--time_retained", "1 d") .run(); - assertThat(tagManager.tag("test_tag").getTagTimeRetained().toHours()).isEqualTo(24); + assertThat(tagManager.getOrThrow("test_tag").getTagTimeRetained().toHours()).isEqualTo(24); // replace tag with new snapshot and time_retained createAction( @@ -96,7 +96,7 @@ public void testReplaceTag() throws Exception { "--time_retained", "2 d") .run(); - assertThat(tagManager.tag("test_tag").trimToSnapshot().id()).isEqualTo(1); - assertThat(tagManager.tag("test_tag").getTagTimeRetained().toHours()).isEqualTo(48); + assertThat(tagManager.getOrThrow("test_tag").id()).isEqualTo(1); + assertThat(tagManager.getOrThrow("test_tag").getTagTimeRetained().toHours()).isEqualTo(48); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromTimestampProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromTimestampProcedureITCase.java index 9e7aeb5deded..c2d2bcc10fba 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromTimestampProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromTimestampProcedureITCase.java @@ -133,7 +133,7 @@ public void testCreateTagsFromTagsCommitTime() throws Exception { FileStoreTable table = paimonTable("T"); long earliestCommitTime = table.snapshotManager().earliestSnapshot().timeMillis(); - long tagSnapshotCommitTime = table.tagManager().taggedSnapshot("tag1").timeMillis(); + long tagSnapshotCommitTime = table.tagManager().getOrThrow("tag1").timeMillis(); assertThat(tagSnapshotCommitTime < earliestCommitTime).isTrue(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedureITCase.java index 8e659e75c832..9255aa9563fb 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/CreateTagFromWatermarkProcedureITCase.java @@ -147,7 +147,7 @@ public void testCreateTagsFromTagsWatermark() throws Exception { assertThat(table.snapshotManager().snapshotExists(1)).isFalse(); - Snapshot tagSnapshot1 = table.tagManager().taggedSnapshot("tag1"); + Snapshot tagSnapshot1 = table.tagManager().getOrThrow("tag1"); long tagsCommitTime = tagSnapshot1.timeMillis(); long tagsWatermark = tagSnapshot1.watermark(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java index 4a89531b22a0..90e5bc6702d0 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireTagsProcedureITCase.java @@ -102,7 +102,7 @@ public void testExpireTagsByOlderThanTime() throws Exception { // tag-2 as the base older_than time. // tag-1 expired by its file creation time. - LocalDateTime olderThanTime1 = table.tagManager().tag("tag-2").getTagCreateTime(); + LocalDateTime olderThanTime1 = table.tagManager().getOrThrow("tag-2").getTagCreateTime(); java.sql.Timestamp timestamp1 = new java.sql.Timestamp( Timestamp.fromLocalDateTime(olderThanTime1).getMillisecond()); @@ -119,7 +119,7 @@ public void testExpireTagsByOlderThanTime() throws Exception { // tag-4 as the base older_than time. // tag-2,tag-3,tag-5 expired, tag-5 reached its tagTimeRetained. - LocalDateTime olderThanTime2 = table.tagManager().tag("tag-4").getTagCreateTime(); + LocalDateTime olderThanTime2 = table.tagManager().getOrThrow("tag-4").getTagCreateTime(); java.sql.Timestamp timestamp2 = new java.sql.Timestamp( Timestamp.fromLocalDateTime(olderThanTime2).getMillisecond()); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ReplaceTagProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ReplaceTagProcedureITCase.java index 8a4eb791a6ad..4ee4287b7f80 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ReplaceTagProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ReplaceTagProcedureITCase.java @@ -44,7 +44,7 @@ public void testExpireTagsByTagCreateTimeAndTagTimeRetained() throws Exception { () -> sql( "CALL sys.replace_tag(`table` => 'default.T', tag => 'test_tag')")) - .hasMessageContaining("Tag name 'test_tag' does not exist."); + .hasMessageContaining("Tag 'test_tag' doesn't exist."); sql("CALL sys.create_tag(`table` => 'default.T', tag => 'test_tag')"); assertThat(sql("select tag_name,snapshot_id,time_retained from `T$tags`")) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java index 987b95d6e39f..32ee2f42af72 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/BatchWriteGeneratorTagOperatorTest.java @@ -97,7 +97,8 @@ public void testBatchWriteGeneratorTag() throws Exception { // Get tagName from tagManager. String tagName = tagManager.allTagNames().get(0); // The tag is consistent with the latest snapshot - assertThat(tagManager.taggedSnapshot(tagName)).isEqualTo(snapshotManager.latestSnapshot()); + assertThat(tagManager.getOrThrow(tagName).trimToSnapshot()) + .isEqualTo(snapshotManager.latestSnapshot()); // test tag expiration table.createTag("many-tags-test1"); diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala index 1da05843dcf6..301b769288eb 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateTagFromTimestampProcedureTest.scala @@ -143,7 +143,7 @@ class CreateTagFromTimestampProcedureTest extends PaimonSparkTestBase with Strea val table = loadTable("T") val latestCommitTime = table.snapshotManager.latestSnapshot().timeMillis - val tagsCommitTime = table.tagManager().taggedSnapshot("test_tag").timeMillis + val tagsCommitTime = table.tagManager().getOrThrow("test_tag").timeMillis assert(latestCommitTime > tagsCommitTime) // make snapshot 1 expire. diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala index 65c0f2b9a203..1ac9709c8775 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/ExpireTagsProcedureTest.scala @@ -97,7 +97,7 @@ class ExpireTagsProcedureTest extends PaimonSparkTestBase { // tag-2 as the base older_than time. // tag-1 expired by its file creation time. - val olderThanTime1 = table.tagManager().tag("tag-2").getTagCreateTime + val olderThanTime1 = table.tagManager().getOrThrow("tag-2").getTagCreateTime val timestamp1 = new java.sql.Timestamp(Timestamp.fromLocalDateTime(olderThanTime1).getMillisecond) checkAnswer( @@ -112,7 +112,7 @@ class ExpireTagsProcedureTest extends PaimonSparkTestBase { // tag-4 as the base older_than time. // tag-2,tag-3,tag-5 expired, tag-5 reached its tagTimeRetained. - val olderThanTime2 = table.tagManager().tag("tag-4").getTagCreateTime + val olderThanTime2 = table.tagManager().getOrThrow("tag-4").getTagCreateTime val timestamp2 = new java.sql.Timestamp(Timestamp.fromLocalDateTime(olderThanTime2).getMillisecond) checkAnswer(