diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java new file mode 100644 index 000000000000..060360623cd0 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.manifest; + +import org.apache.paimon.data.BinaryRow; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +/** A {@link SimpleFileEntry} with {@link #fileSource}. */ +public class ExpireFileEntry extends SimpleFileEntry { + + @Nullable private final FileSource fileSource; + + public ExpireFileEntry( + FileKind kind, + BinaryRow partition, + int bucket, + int level, + String fileName, + List extraFiles, + @Nullable byte[] embeddedIndex, + BinaryRow minKey, + BinaryRow maxKey, + @Nullable FileSource fileSource) { + super(kind, partition, bucket, level, fileName, extraFiles, embeddedIndex, minKey, maxKey); + this.fileSource = fileSource; + } + + public Optional fileSource() { + return Optional.ofNullable(fileSource); + } + + public static ExpireFileEntry from(ManifestEntry entry) { + return new ExpireFileEntry( + entry.kind(), + entry.partition(), + entry.bucket(), + entry.level(), + entry.fileName(), + entry.file().extraFiles(), + entry.file().embeddedIndex(), + entry.minKey(), + entry.maxKey(), + entry.file().fileSource().orElse(null)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + ExpireFileEntry that = (ExpireFileEntry) o; + return fileSource == that.fileSource; + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), fileSource); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java index 91e07a369da2..a2569beac61c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java @@ -60,6 +60,8 @@ public interface FileEntry { BinaryRow maxKey(); + List extraFiles(); + /** * The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data * file. diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java index ee5dc2c34421..626e0a5d468f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java @@ -102,6 +102,11 @@ public BinaryRow maxKey() { return file.maxKey(); } + @Override + public List extraFiles() { + return file.extraFiles(); + } + public int totalBuckets() { return totalBuckets; } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java index 128f5262a553..1aba2ef19561 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java @@ -39,6 +39,7 @@ import javax.annotation.Nullable; import java.io.IOException; +import java.util.ArrayList; import java.util.List; /** @@ -84,6 +85,15 @@ public long suggestedFileSize() { return suggestedFileSize; } + public List readExpireFileEntries(String fileName, @Nullable Long fileSize) { + List entries = read(fileName, fileSize); + List result = new ArrayList<>(entries.size()); + for (ManifestEntry entry : entries) { + result.add(ExpireFileEntry.from(entry)); + } + return result; + } + /** * Write several {@link ManifestEntry}s into manifest files. * diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java index 8d33ede0c4a1..fdaed2b85aaf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java @@ -117,6 +117,11 @@ public BinaryRow maxKey() { return maxKey; } + @Override + public List extraFiles() { + return extraFiles; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java index c20405ff26c9..069e57bb3daf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java @@ -23,8 +23,8 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.manifest.ExpireFileEntry; import org.apache.paimon.manifest.IndexManifestEntry; -import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.manifest.ManifestList; @@ -60,7 +60,7 @@ public ChangelogDeletion( } @Override - public void cleanUnusedDataFiles(Changelog changelog, Predicate skipper) { + public void cleanUnusedDataFiles(Changelog changelog, Predicate skipper) { if (changelog.changelogManifestList() != null) { deleteAddedDataFiles(changelog.changelogManifestList()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java index 303a074b0cb8..cfecd767b6fb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java @@ -24,10 +24,11 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.manifest.ExpireFileEntry; import org.apache.paimon.manifest.FileEntry; +import org.apache.paimon.manifest.FileEntry.Identifier; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.IndexManifestEntry; -import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.manifest.ManifestList; @@ -46,7 +47,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -54,7 +54,6 @@ import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.function.Predicate; -import java.util.stream.Collectors; /** * Base class for file deletion including methods for clean data files, manifest files and empty @@ -110,7 +109,7 @@ public FileDeletionBase( * @param skipper if the test result of a data file is true, it will be skipped when deleting; * else it will be deleted */ - public abstract void cleanUnusedDataFiles(T snapshot, Predicate skipper); + public abstract void cleanUnusedDataFiles(T snapshot, Predicate skipper); /** * Clean metadata files that will not be used anymore of a snapshot, including data manifests, @@ -164,21 +163,23 @@ public void cleanEmptyDirectories() { deletionBuckets.clear(); } - protected void recordDeletionBuckets(ManifestEntry entry) { + protected void recordDeletionBuckets(ExpireFileEntry entry) { deletionBuckets .computeIfAbsent(entry.partition(), p -> new HashSet<>()) .add(entry.bucket()); } - public void cleanUnusedDataFiles(String manifestList, Predicate skipper) { + public void cleanUnusedDataFiles(String manifestList, Predicate skipper) { // try read manifests - List manifestFileNames = readManifestFileNames(tryReadManifestList(manifestList)); - List manifestEntries; + List manifests = tryReadManifestList(manifestList); + List manifestEntries; // data file path -> (original manifest entry, extra file paths) - Map>> dataFileToDelete = new HashMap<>(); - for (String manifest : manifestFileNames) { + Map>> dataFileToDelete = new HashMap<>(); + for (ManifestFileMeta manifest : manifests) { try { - manifestEntries = manifestFile.read(manifest); + manifestEntries = + manifestFile.readExpireFileEntries( + manifest.fileName(), manifest.fileSize()); } catch (Exception e) { // cancel deletion if any exception occurs LOG.warn("Failed to read some manifest files. Cancel deletion.", e); @@ -192,12 +193,12 @@ public void cleanUnusedDataFiles(String manifestList, Predicate s } protected void doCleanUnusedDataFile( - Map>> dataFileToDelete, - Predicate skipper) { + Map>> dataFileToDelete, + Predicate skipper) { List actualDataFileToDelete = new ArrayList<>(); dataFileToDelete.forEach( (path, pair) -> { - ManifestEntry entry = pair.getLeft(); + ExpireFileEntry entry = pair.getLeft(); // check whether we should skip the data file if (!skipper.test(entry)) { // delete data files @@ -211,20 +212,20 @@ protected void doCleanUnusedDataFile( } protected void getDataFileToDelete( - Map>> dataFileToDelete, - List dataFileEntries) { + Map>> dataFileToDelete, + List dataFileEntries) { // we cannot delete a data file directly when we meet a DELETE entry, because that // file might be upgraded - for (ManifestEntry entry : dataFileEntries) { + for (ExpireFileEntry entry : dataFileEntries) { Path bucketPath = pathFactory.bucketPath(entry.partition(), entry.bucket()); - Path dataFilePath = new Path(bucketPath, entry.file().fileName()); + Path dataFilePath = new Path(bucketPath, entry.fileName()); switch (entry.kind()) { case ADD: dataFileToDelete.remove(dataFilePath); break; case DELETE: - List extraFiles = new ArrayList<>(entry.file().extraFiles().size()); - for (String file : entry.file().extraFiles()) { + List extraFiles = new ArrayList<>(entry.extraFiles().size()); + for (String file : entry.extraFiles()) { extraFiles.add(new Path(bucketPath, file)); } dataFileToDelete.put(dataFilePath, Pair.of(entry, extraFiles)); @@ -242,27 +243,28 @@ protected void getDataFileToDelete( * @param manifestListName name of manifest list */ public void deleteAddedDataFiles(String manifestListName) { - List manifestFileNames = - readManifestFileNames(tryReadManifestList(manifestListName)); - for (String file : manifestFileNames) { + List manifests = tryReadManifestList(manifestListName); + for (ManifestFileMeta manifest : manifests) { try { - List manifestEntries = manifestFile.read(file); + List manifestEntries = + manifestFile.readExpireFileEntries( + manifest.fileName(), manifest.fileSize()); deleteAddedDataFiles(manifestEntries); } catch (Exception e) { // We want to delete the data file, so just ignore the unavailable files - LOG.info("Failed to read manifest " + file + ". Ignore it.", e); + LOG.info("Failed to read manifest " + manifest.fileName() + ". Ignore it.", e); } } } - private void deleteAddedDataFiles(List manifestEntries) { + private void deleteAddedDataFiles(List manifestEntries) { List dataFileToDelete = new ArrayList<>(); - for (ManifestEntry entry : manifestEntries) { + for (ExpireFileEntry entry : manifestEntries) { if (entry.kind() == FileKind.ADD) { dataFileToDelete.add( new Path( pathFactory.bucketPath(entry.partition(), entry.bucket()), - entry.file().fileName())); + entry.fileName())); recordDeletionBuckets(entry); } } @@ -327,7 +329,7 @@ protected void cleanUnusedManifests( cleanUnusedStatisticsManifests(snapshot, skippingSet); } - public Predicate createDataFileSkipperForTags( + public Predicate createDataFileSkipperForTags( List taggedSnapshots, long expiringSnapshotId) throws Exception { int index = SnapshotManager.findPreviousSnapshot(taggedSnapshots, expiringSnapshotId); // refresh tag data files @@ -358,18 +360,6 @@ protected List tryReadManifestList(String manifestListName) { } } - protected List tryReadDataManifests(Snapshot snapshot) { - List manifestFileMetas = tryReadManifestList(snapshot.baseManifestList()); - manifestFileMetas.addAll(tryReadManifestList(snapshot.deltaManifestList())); - return readManifestFileNames(manifestFileMetas); - } - - protected List readManifestFileNames(List manifestFileMetas) { - return manifestFileMetas.stream() - .map(ManifestFileMeta::fileName) - .collect(Collectors.toCollection(LinkedList::new)); - } - /** * NOTE: This method is used for building data file skipping set. If failed to read some * manifests, it will throw exception which callers must handle. @@ -377,23 +367,26 @@ protected List readManifestFileNames(List manifestFile protected void addMergedDataFiles( Map>> dataFiles, Snapshot snapshot) throws IOException { - for (ManifestEntry entry : readMergedDataFiles(snapshot)) { + for (ExpireFileEntry entry : readMergedDataFiles(snapshot)) { dataFiles .computeIfAbsent(entry.partition(), p -> new HashMap<>()) .computeIfAbsent(entry.bucket(), b -> new HashSet<>()) - .add(entry.file().fileName()); + .add(entry.fileName()); } } - protected Collection readMergedDataFiles(Snapshot snapshot) throws IOException { + protected Collection readMergedDataFiles(Snapshot snapshot) + throws IOException { // read data manifests - List files = tryReadDataManifests(snapshot); + + List manifests = tryReadManifestList(snapshot.baseManifestList()); + manifests.addAll(tryReadManifestList(snapshot.deltaManifestList())); // read and merge manifest entries - Map map = new HashMap<>(); - for (String manifest : files) { - List entries; - entries = manifestFile.readWithIOException(manifest); + Map map = new HashMap<>(); + for (ManifestFileMeta manifest : manifests) { + List entries = + manifestFile.readExpireFileEntries(manifest.fileName(), manifest.fileSize()); FileEntry.mergeEntries(entries, map); } @@ -401,12 +394,12 @@ protected Collection readMergedDataFiles(Snapshot snapshot) throw } protected boolean containsDataFile( - Map>> dataFiles, ManifestEntry testee) { - Map> buckets = dataFiles.get(testee.partition()); + Map>> dataFiles, ExpireFileEntry entry) { + Map> buckets = dataFiles.get(entry.partition()); if (buckets != null) { - Set fileNames = buckets.get(testee.bucket()); + Set fileNames = buckets.get(entry.bucket()); if (fileNames != null) { - return fileNames.contains(testee.file().fileName()); + return fileNames.contains(entry.fileName()); } } return false; diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java index d86907ecea54..7d55b64c8eac 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java @@ -23,8 +23,8 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; +import org.apache.paimon.manifest.ExpireFileEntry; import org.apache.paimon.manifest.FileSource; -import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.stats.StatsFileHandler; @@ -65,15 +65,15 @@ public SnapshotDeletion( } @Override - public void cleanUnusedDataFiles(Snapshot snapshot, Predicate skipper) { + public void cleanUnusedDataFiles(Snapshot snapshot, Predicate skipper) { if (changelogDecoupled && !produceChangelog) { // Skip clean the 'APPEND' data files.If we do not have the file source information // eg: the old version table file, we just skip clean this here, let it done by // ExpireChangelogImpl - Predicate enriched = + Predicate enriched = manifestEntry -> skipper.test(manifestEntry) - || (manifestEntry.file().fileSource().orElse(FileSource.APPEND) + || (manifestEntry.fileSource().orElse(FileSource.APPEND) == FileSource.APPEND); cleanUnusedDataFiles(snapshot.deltaManifestList(), enriched); } else { @@ -92,8 +92,8 @@ public void cleanUnusedManifests(Snapshot snapshot, Set skippingSet) { } @VisibleForTesting - void cleanUnusedDataFile(List dataFileLog) { - Map>> dataFileToDelete = new HashMap<>(); + void cleanUnusedDataFile(List dataFileLog) { + Map>> dataFileToDelete = new HashMap<>(); getDataFileToDelete(dataFileToDelete, dataFileLog); doCleanUnusedDataFile(dataFileToDelete, f -> false); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java index a6cd338d5859..2722ed0c7ec8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java @@ -23,7 +23,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; -import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ExpireFileEntry; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.stats.StatsFileHandler; @@ -68,8 +68,8 @@ public TagDeletion( } @Override - public void cleanUnusedDataFiles(Snapshot taggedSnapshot, Predicate skipper) { - Collection manifestEntries; + public void cleanUnusedDataFiles(Snapshot taggedSnapshot, Predicate skipper) { + Collection manifestEntries; try { manifestEntries = readMergedDataFiles(taggedSnapshot); } catch (IOException e) { @@ -78,11 +78,11 @@ public void cleanUnusedDataFiles(Snapshot taggedSnapshot, Predicate dataFileToDelete = new HashSet<>(); - for (ManifestEntry entry : manifestEntries) { + for (ExpireFileEntry entry : manifestEntries) { if (!skipper.test(entry)) { Path bucketPath = pathFactory.bucketPath(entry.partition(), entry.bucket()); - dataFileToDelete.add(new Path(bucketPath, entry.file().fileName())); - for (String file : entry.file().extraFiles()) { + dataFileToDelete.add(new Path(bucketPath, entry.fileName())); + for (String file : entry.extraFiles()) { dataFileToDelete.add(new Path(bucketPath, file)); } @@ -98,11 +98,12 @@ public void cleanUnusedManifests(Snapshot taggedSnapshot, Set skippingSe cleanUnusedManifests(taggedSnapshot, skippingSet, true, false); } - public Predicate dataFileSkipper(Snapshot fromSnapshot) throws Exception { + public Predicate dataFileSkipper(Snapshot fromSnapshot) throws Exception { return dataFileSkipper(Collections.singletonList(fromSnapshot)); } - public Predicate dataFileSkipper(List fromSnapshots) throws Exception { + public Predicate dataFileSkipper(List fromSnapshots) + throws Exception { Map>> skipped = new HashMap<>(); for (Snapshot snapshot : fromSnapshots) { addMergedDataFiles(skipped, snapshot); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java index ce5497545055..1ffa7485aee5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java @@ -21,7 +21,7 @@ import org.apache.paimon.Changelog; import org.apache.paimon.Snapshot; import org.apache.paimon.consumer.ConsumerManager; -import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ExpireFileEntry; import org.apache.paimon.operation.ChangelogDeletion; import org.apache.paimon.options.ExpireConfig; import org.apache.paimon.utils.Preconditions; @@ -147,7 +147,7 @@ public int expireUntil(long earliestId, long endExclusiveId) { LOG.debug("Ready to delete changelog files from changelog #" + id); } Changelog changelog = snapshotManager.longLivedChangelog(id); - Predicate skipper; + Predicate skipper; try { skipper = changelogDeletion.createDataFileSkipperForTags(taggedSnapshots, id); } catch (Exception e) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java index 2c83b63c97ef..dc1c2d6bdbc5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java @@ -22,7 +22,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.consumer.ConsumerManager; -import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ExpireFileEntry; import org.apache.paimon.operation.SnapshotDeletion; import org.apache.paimon.options.ExpireConfig; import org.apache.paimon.utils.Preconditions; @@ -176,7 +176,7 @@ public int expireUntil(long earliestId, long endExclusiveId) { continue; } // expire merge tree files and collect changed buckets - Predicate skipper; + Predicate skipper; try { skipper = snapshotDeletion.createDataFileSkipperForTags(taggedSnapshots, id); } catch (Exception e) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java index 1eb4ccf00136..29fecec11353 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java @@ -21,7 +21,7 @@ import org.apache.paimon.Changelog; import org.apache.paimon.Snapshot; import org.apache.paimon.fs.FileIO; -import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ExpireFileEntry; import org.apache.paimon.operation.ChangelogDeletion; import org.apache.paimon.operation.SnapshotDeletion; import org.apache.paimon.operation.TagDeletion; @@ -205,7 +205,7 @@ private List cleanTagsDataFiles(Snapshot retainedSnapshot) { } // delete data files - Predicate dataFileSkipper = null; + Predicate dataFileSkipper = null; boolean success = true; try { dataFileSkipper = tagDeletion.dataFileSkipper(retainedSnapshot); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 1e05a100d741..4019395d8d65 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -23,7 +23,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; -import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ExpireFileEntry; import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.table.sink.TagCallback; import org.apache.paimon.tag.Tag; @@ -255,7 +255,7 @@ private void doClean( skippedSnapshots.add(right); // delete data files and empty directories - Predicate dataFileSkipper = null; + Predicate dataFileSkipper = null; boolean success = true; try { dataFileSkipper = tagDeletion.dataFileSkipper(skippedSnapshots); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index 96dce3d78426..9dc98343734b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -29,6 +29,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.ExpireFileEntry; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.FileSource; import org.apache.paimon.manifest.ManifestEntry; @@ -218,7 +219,9 @@ public void testExpireExtraFiles() throws IOException { ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition, 0, 1, dataFile); // expire - expire.snapshotDeletion().cleanUnusedDataFile(Arrays.asList(add, delete)); + expire.snapshotDeletion() + .cleanUnusedDataFile( + Arrays.asList(ExpireFileEntry.from(add), ExpireFileEntry.from(delete))); // check assertThat(fileIO.exists(myDataFile)).isFalse();