Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import java.io.IOException;
import java.util.function.Supplier;

/** A {@link Supplier} with {@link IOException}. */
@FunctionalInterface
public interface SupplierWithIOException<T> {

T get() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializableConsumer;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.SupplierWithIOException;
import org.apache.paimon.utils.TagManager;

import org.slf4j.Logger;
Expand Down Expand Up @@ -322,13 +323,13 @@ protected List<FileStatus> tryBestListingDirs(Path dir) {
* {@link FileNotFoundException}, return default value. Finally, if retry times reaches the
* limits, rethrow the IOException.
*/
protected static <T> T retryReadingFiles(ReaderWithIOException<T> reader, T defaultValue)
protected static <T> T retryReadingFiles(SupplierWithIOException<T> reader, T defaultValue)
throws IOException {
int retryNumber = 0;
IOException caught = null;
while (retryNumber++ < READ_FILE_RETRY_NUM) {
try {
return reader.read();
return reader.get();
} catch (FileNotFoundException e) {
return defaultValue;
} catch (IOException e) {
Expand All @@ -349,13 +350,6 @@ protected boolean oldEnough(FileStatus status) {
return status.getModificationTime() < olderThanMillis;
}

/** A helper functional interface for method {@link #retryReadingFiles}. */
@FunctionalInterface
protected interface ReaderWithIOException<T> {

T read() throws IOException;
}

public static SerializableConsumer<Path> createFileCleaner(
Catalog catalog, @Nullable Boolean dryRun) {
SerializableConsumer<Path> fileCleaner;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ private Optional<TableSchema> travelToVersion(String version, Options options) {
}

private Optional<TableSchema> travelToTag(String tagName, Options options) {
return travelToSnapshot(tagManager().taggedSnapshot(tagName), options);
return travelToSnapshot(tagManager().getOrThrow(tagName).trimToSnapshot(), options);
}

private Optional<TableSchema> travelToSnapshot(long snapshotId, Options options) {
Expand Down Expand Up @@ -633,7 +633,9 @@ public void createTag(String tagName, Duration timeRetained) {
}

private void createTag(String tagName, Snapshot fromSnapshot, @Nullable Duration timeRetained) {
tagManager().createTag(fromSnapshot, tagName, timeRetained, store().createTagCallbacks());
tagManager()
.createTag(
fromSnapshot, tagName, timeRetained, store().createTagCallbacks(), false);
}

@Override
Expand Down Expand Up @@ -689,7 +691,7 @@ public void rollbackTo(String tagName) {
TagManager tagManager = tagManager();
checkArgument(tagManager.tagExists(tagName), "Rollback tag '%s' doesn't exist.", tagName);

Snapshot taggedSnapshot = tagManager.taggedSnapshot(tagName);
Snapshot taggedSnapshot = tagManager.getOrThrow(tagName).trimToSnapshot();
rollbackHelper().cleanLargerThan(taggedSnapshot);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,36 +25,27 @@
/** {@link StartingScanner} for incremental changes by tag. */
public class IncrementalTagStartingScanner extends AbstractStartingScanner {

private final String start;
private final String end;
private final Snapshot start;
private final Snapshot end;

public IncrementalTagStartingScanner(
SnapshotManager snapshotManager, String start, String end) {
SnapshotManager snapshotManager, String startTagName, String endTagName) {
super(snapshotManager);
this.start = start;
this.end = end;
TagManager tagManager =
new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
Snapshot startingSnapshot = tagManager.taggedSnapshot(start);
if (startingSnapshot != null) {
this.startingSnapshotId = startingSnapshot.id();
}
}

@Override
public Result scan(SnapshotReader reader) {
TagManager tagManager =
new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
Snapshot tag1 = tagManager.taggedSnapshot(start);
Snapshot tag2 = tagManager.taggedSnapshot(end);

if (tag2.id() <= tag1.id()) {
start = tagManager.getOrThrow(startTagName).trimToSnapshot();
end = tagManager.getOrThrow(endTagName).trimToSnapshot();
if (end.id() <= start.id()) {
throw new IllegalArgumentException(
String.format(
"Tag end %s with snapshot id %s should be larger than tag start %s with snapshot id %s",
end, tag2.id(), start, tag1.id()));
endTagName, end.id(), startTagName, start.id()));
}
this.startingSnapshotId = start.id();
}

return StartingScanner.fromPlan(reader.withSnapshot(tag2).readIncrementalDiff(tag1));
@Override
public Result scan(SnapshotReader reader) {
return StartingScanner.fromPlan(reader.withSnapshot(end).readIncrementalDiff(start));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public ScanMode startingScanMode() {
public SnapshotReader configure(SnapshotReader snapshotReader) {
TagManager tagManager =
new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
Snapshot snapshot = tagManager.taggedSnapshot(tagName);
Snapshot snapshot = tagManager.getOrThrow(tagName).trimToSnapshot();
return snapshotReader.withMode(ScanMode.ALL).withSnapshot(snapshot);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,6 @@ private static Snapshot resolveSnapshotByTagName(
String tagName = options.scanTagName();
TagManager tagManager =
new TagManager(snapshotManager.fileIO(), snapshotManager.tablePath());
return tagManager.taggedSnapshot(tagName);
return tagManager.getOrThrow(tagName).trimToSnapshot();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
Expand Down Expand Up @@ -227,27 +228,25 @@ public RecordReader<InternalRow> createReader(Split split) {
&& ((LeafPredicate) predicate).literals().get(0) instanceof BinaryString
&& predicate.visit(LeafPredicateExtractor.INSTANCE).get(TAG_NAME) != null) {
String equalValue = ((LeafPredicate) predicate).literals().get(0).toString();
if (tagManager.tagExists(equalValue)) {
predicateMap.put(equalValue, tagManager.tag(equalValue));
}
tagManager.get(equalValue).ifPresent(tag -> predicateMap.put(equalValue, tag));
}

if (predicate instanceof CompoundPredicate) {
CompoundPredicate compoundPredicate = (CompoundPredicate) predicate;
// optimize for IN filter
if ((compoundPredicate.function()) instanceof Or) {
List<String> tagNames = new ArrayList<>();
InPredicateVisitor.extractInElements(predicate, TAG_NAME)
.ifPresent(
leafs ->
leafs.forEach(
leaf -> {
String leftName = leaf.toString();
if (tagManager.tagExists(leftName)) {
predicateMap.put(
leftName,
tagManager.tag(leftName));
}
}));
e ->
e.stream()
.map(Object::toString)
.forEach(tagNames::add));
tagNames.forEach(
name ->
tagManager
.get(name)
.ifPresent(value -> predicateMap.put(name, value)));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,8 @@ private void tryToCreateTags(Snapshot snapshot) {
}
String tagName = periodHandler.timeToTag(thisTag);
LOG.info("The tag name is {}.", tagName);
if (!tagManager.tagExists(tagName)) {
tagManager.createTag(snapshot, tagName, defaultTimeRetained, callbacks);
}
// shouldn't throw exception when tag exists
tagManager.createTag(snapshot, tagName, defaultTimeRetained, callbacks, true);
nextTag = periodHandler.nextTagTime(thisTag);
LOG.info("The next tag time after this is {}.", nextTag);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,7 @@ branchName, branchPath(tablePath, branchName)),

public void createBranch(String branchName, String tagName) {
validateBranch(branchName);
checkArgument(tagManager.tagExists(tagName), "Tag name '%s' not exists.", tagName);

Snapshot snapshot = tagManager.taggedSnapshot(tagName);
Snapshot snapshot = tagManager.getOrThrow(tagName).trimToSnapshot();

try {
// Copy the corresponding tag, snapshot and schema files into the branch directory
Expand Down
87 changes: 49 additions & 38 deletions paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
Expand Down Expand Up @@ -97,31 +98,34 @@ public List<Path> tagPaths(Predicate<Path> predicate) throws IOException {

/** Create a tag from given snapshot and save it in the storage. */
public void createTag(
Snapshot snapshot, String tagName, Duration timeRetained, List<TagCallback> callbacks) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' is blank.", tagName);
checkArgument(!tagExists(tagName), "Tag name '%s' already exists.", tagName);
Snapshot snapshot,
String tagName,
Duration timeRetained,
List<TagCallback> callbacks,
boolean ignoreIfExists) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name shouldn't be blank.");
if (tagExists(tagName)) {
checkArgument(ignoreIfExists, "Tag '%s' already exists.", tagName);
return;
}
createOrReplaceTag(snapshot, tagName, timeRetained, callbacks);
}

/** Replace a tag from given snapshot and save it in the storage. */
public void replaceTag(Snapshot snapshot, String tagName, Duration timeRetained) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' is blank.", tagName);
checkArgument(tagExists(tagName), "Tag name '%s' does not exist.", tagName);
checkArgument(!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name shouldn't be blank.");
checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);
createOrReplaceTag(snapshot, tagName, timeRetained, null);
}

public void createOrReplaceTag(
private void createOrReplaceTag(
Snapshot snapshot,
String tagName,
@Nullable Duration timeRetained,
@Nullable List<TagCallback> callbacks) {
// When timeRetained is not defined, please do not write the tagCreatorTime field,
// as this will cause older versions (<= 0.7) of readers to be unable to read this
// tag.
// When timeRetained is defined, it is fine, because timeRetained is the new
// feature.
// When timeRetained is not defined, please do not write the tagCreatorTime field, as this
// will cause older versions (<= 0.7) of readers to be unable to read this tag.
// When timeRetained is defined, it is fine, because timeRetained is the new feature.
String content =
timeRetained != null
? Tag.fromSnapshotAndTagTtl(snapshot, timeRetained, LocalDateTime.now())
Expand Down Expand Up @@ -152,17 +156,17 @@ public void createOrReplaceTag(
}

public void renameTag(String tagName, String targetTagName) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(tagName),
"Original tag name shouldn't be blank.");
checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);

checkArgument(
!StringUtils.isNullOrWhitespaceOnly(targetTagName),
"New tag name shouldn't be blank.");
checkArgument(!tagExists(targetTagName), "Tag '%s' already exists.", tagName);

try {
if (!tagExists(tagName)) {
throw new RuntimeException(
String.format("The specified tag name [%s] does not exist.", tagName));
}
if (tagExists(targetTagName)) {
throw new RuntimeException(
String.format(
"The specified target tag name [%s] existed, please set a non-existent tag name.",
targetTagName));
}
fileIO.rename(tagPath(tagName), tagPath(targetTagName));
} catch (IOException e) {
throw new RuntimeException(e);
Expand All @@ -172,7 +176,7 @@ public void renameTag(String tagName, String targetTagName) {
/** Make sure the tagNames are ALL tags of one snapshot. */
public void deleteAllTagsOfOneSnapshot(
List<String> tagNames, TagDeletion tagDeletion, SnapshotManager snapshotManager) {
Snapshot taggedSnapshot = taggedSnapshot(tagNames.get(0));
Snapshot taggedSnapshot = getOrThrow(tagNames.get(0)).trimToSnapshot();
List<Snapshot> taggedSnapshots;

// skip file deletion if snapshot exists
Expand All @@ -188,19 +192,20 @@ public void deleteAllTagsOfOneSnapshot(
doClean(taggedSnapshot, taggedSnapshots, snapshotManager, tagDeletion);
}

/** Ignore errors if the tag doesn't exist. */
public void deleteTag(
String tagName,
TagDeletion tagDeletion,
SnapshotManager snapshotManager,
List<TagCallback> callbacks) {
checkArgument(
!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name '%s' is blank.", tagName);
if (!tagExists(tagName)) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name shouldn't be blank.");
Optional<Tag> tag = get(tagName);
if (!tag.isPresent()) {
LOG.warn("Tag '{}' doesn't exist.", tagName);
return;
}

Snapshot taggedSnapshot = taggedSnapshot(tagName);
Snapshot taggedSnapshot = tag.get().trimToSnapshot();
List<Snapshot> taggedSnapshots;

// skip file deletion if snapshot exists
Expand Down Expand Up @@ -303,10 +308,21 @@ public boolean tagExists(String tagName) {
}
}

/** Get the tagged snapshot by name. */
public Snapshot taggedSnapshot(String tagName) {
checkArgument(tagExists(tagName), "Tag '%s' doesn't exist.", tagName);
return Tag.fromPath(fileIO, tagPath(tagName)).trimToSnapshot();
/** Return the tag or Optional.empty() if the tag file not found. */
public Optional<Tag> get(String tagName) {
checkArgument(!StringUtils.isNullOrWhitespaceOnly(tagName), "Tag name shouldn't be blank.");
try {
return Optional.of(Tag.tryFromPath(fileIO, tagPath(tagName)));
} catch (FileNotFoundException e) {
return Optional.empty();
}
}

/** Return the tag or throw exception indicating the tag not found. */
public Tag getOrThrow(String tagName) {
return get(tagName)
.orElseThrow(
() -> new IllegalArgumentException("Tag '" + tagName + "' doesn't exist."));
}

public long tagCount() {
Expand Down Expand Up @@ -410,12 +426,7 @@ private int findIndex(Snapshot taggedSnapshot, List<Snapshot> taggedSnapshots) {
}
throw new RuntimeException(
String.format(
"Didn't find tag with snapshot id '%s'.This is unexpected.",
"Didn't find tag with snapshot id '%s'. This is unexpected.",
taggedSnapshot.id()));
}

/** Read tag for tagName. */
public Tag tag(String tagName) {
return Tag.fromPath(fileIO, tagPath(tagName));
}
}
Loading
Loading