diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java index f561546e8bb3..9301dab3eba2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java @@ -41,6 +41,7 @@ import org.apache.paimon.iceberg.metadata.IcebergSnapshot; import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.options.Options; @@ -446,11 +447,16 @@ private boolean collectFileChanges( Map removedFiles, Map> addedFiles) { boolean isAddOnly = true; + Map, DataFilePathFactory> dataFilePathFactoryMap = new HashMap<>(); for (ManifestEntry entry : manifestEntries) { - String path = - fileStorePathFactory.bucketPath(entry.partition(), entry.bucket()) - + "/" - + entry.fileName(); + Pair bucket = Pair.of(entry.partition(), entry.bucket()); + DataFilePathFactory dataFilePathFactory = + dataFilePathFactoryMap.computeIfAbsent( + bucket, + b -> + fileStorePathFactory.createDataFilePathFactory( + entry.partition(), entry.bucket())); + String path = dataFilePathFactory.toPath(entry).toString(); switch (entry.kind()) { case ADD: if (shouldAddFileToIceberg(entry.file())) { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java index a70a795ef02d..eca6258e2eee 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java @@ -110,6 +110,14 @@ public Path toAlignedPath(String fileName, DataFileMeta aligned) { return new Path(aligned.externalPathDir().map(Path::new).orElse(parent), fileName); } + public Path toAlignedPath(String fileName, FileEntry aligned) { + Optional externalPathDir = + Optional.ofNullable(aligned.externalPath()) + .map(Path::new) + .map(p -> p.getParent().toUri().toString()); + return new Path(externalPathDir.map(Path::new).orElse(parent), fileName); + } + public static Path dataFileToFileIndexPath(Path dataFilePath) { return new Path(dataFilePath.getParent(), dataFilePath.getName() + INDEX_PATH_SUFFIX); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java index cfecd767b6fb..1a029245078f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java @@ -24,6 +24,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.manifest.ExpireFileEntry; import org.apache.paimon.manifest.FileEntry; import org.apache.paimon.manifest.FileEntry.Identifier; @@ -216,9 +217,16 @@ protected void getDataFileToDelete( List dataFileEntries) { // we cannot delete a data file directly when we meet a DELETE entry, because that // file might be upgraded + Map, DataFilePathFactory> dataFilePathFactoryMap = new HashMap<>(); for (ExpireFileEntry entry : dataFileEntries) { - Path bucketPath = pathFactory.bucketPath(entry.partition(), entry.bucket()); - Path dataFilePath = new Path(bucketPath, entry.fileName()); + Pair bucket = Pair.of(entry.partition(), entry.bucket()); + DataFilePathFactory dataFilePathFactory = + dataFilePathFactoryMap.computeIfAbsent( + bucket, + b -> + pathFactory.createDataFilePathFactory( + entry.partition(), entry.bucket())); + Path dataFilePath = dataFilePathFactory.toPath(entry); switch (entry.kind()) { case ADD: dataFileToDelete.remove(dataFilePath); @@ -226,7 +234,7 @@ protected void getDataFileToDelete( case DELETE: List extraFiles = new ArrayList<>(entry.extraFiles().size()); for (String file : entry.extraFiles()) { - extraFiles.add(new Path(bucketPath, file)); + extraFiles.add(dataFilePathFactory.toAlignedPath(file, entry)); } dataFileToDelete.put(dataFilePath, Pair.of(entry, extraFiles)); break; @@ -259,12 +267,17 @@ public void deleteAddedDataFiles(String manifestListName) { private void deleteAddedDataFiles(List manifestEntries) { List dataFileToDelete = new ArrayList<>(); + Map, DataFilePathFactory> dataFilePathFactoryMap = new HashMap<>(); for (ExpireFileEntry entry : manifestEntries) { + Pair bucket = Pair.of(entry.partition(), entry.bucket()); + DataFilePathFactory dataFilePathFactory = + dataFilePathFactoryMap.computeIfAbsent( + bucket, + b -> + pathFactory.createDataFilePathFactory( + entry.partition(), entry.bucket())); if (entry.kind() == FileKind.ADD) { - dataFileToDelete.add( - new Path( - pathFactory.bucketPath(entry.partition(), entry.bucket()), - entry.fileName())); + dataFileToDelete.add(dataFilePathFactory.toPath(entry)); recordDeletionBuckets(entry); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index 6a4276662468..c64b866929a5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -97,7 +97,7 @@ public CleanOrphanFilesResult clean() // specially handle to clear snapshot dir cleanSnapshotDir(branches, deleteFiles::add, deletedFilesLenInBytes::addAndGet); - // delete candidate files + // get candidate files Map> candidates = getCandidateDeletingFiles(); if (candidates.isEmpty()) { return new CleanOrphanFilesResult( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 54e082091840..2c669c3ea9e0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -262,6 +262,14 @@ protected List listPaimonFileDirs() { paimonFileDirs.add(pathFactory.statisticsPath()); paimonFileDirs.addAll(listFileDirs(pathFactory.dataFilePath(), partitionKeysNum)); + // add external data paths + String dataFileExternalPaths = table.store().options().dataFileExternalPaths(); + if (dataFileExternalPaths != null) { + String[] externalPathArr = dataFileExternalPaths.split(","); + for (String externalPath : externalPathArr) { + paimonFileDirs.addAll(listFileDirs(new Path(externalPath), partitionKeysNum)); + } + } return paimonFileDirs; } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java index 2722ed0c7ec8..532b902c185b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java @@ -23,11 +23,13 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; +import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.manifest.ExpireFileEntry; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.stats.StatsFileHandler; import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,12 +80,19 @@ public void cleanUnusedDataFiles(Snapshot taggedSnapshot, Predicate dataFileToDelete = new HashSet<>(); + Map, DataFilePathFactory> dataFilePathFactoryMap = new HashMap<>(); for (ExpireFileEntry entry : manifestEntries) { + Pair bucket = Pair.of(entry.partition(), entry.bucket()); + DataFilePathFactory dataFilePathFactory = + dataFilePathFactoryMap.computeIfAbsent( + bucket, + b -> + pathFactory.createDataFilePathFactory( + entry.partition(), entry.bucket())); if (!skipper.test(entry)) { - Path bucketPath = pathFactory.bucketPath(entry.partition(), entry.bucket()); - dataFileToDelete.add(new Path(bucketPath, entry.fileName())); + dataFileToDelete.add(dataFilePathFactory.toPath(entry)); for (String file : entry.extraFiles()) { - dataFileToDelete.add(new Path(bucketPath, file)); + dataFileToDelete.add(dataFilePathFactory.toAlignedPath(file, entry)); } recordDeletionBuckets(entry); diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index 0d8ea5f4a49a..48fa81c525d8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -25,6 +25,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.io.IndexIncrement; import org.apache.paimon.manifest.FileEntry; import org.apache.paimon.manifest.FileKind; @@ -58,6 +59,7 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.RecordWriter; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -643,11 +645,17 @@ private static Set getSnapshotFileInUse( .flatMap(m -> manifestFile.read(m.fileName()).stream()) .collect(Collectors.toList()); entries = new ArrayList<>(FileEntry.mergeEntries(entries)); + Map, DataFilePathFactory> dataFilePathFactoryMap = new HashMap<>(); + for (ManifestEntry entry : entries) { - result.add( - new Path( - pathFactory.bucketPath(entry.partition(), entry.bucket()), - entry.file().fileName())); + Pair bucket = Pair.of(entry.partition(), entry.bucket()); + DataFilePathFactory dataFilePathFactory = + dataFilePathFactoryMap.computeIfAbsent( + bucket, + b -> + pathFactory.createDataFilePathFactory( + entry.partition(), entry.bucket())); + result.add(dataFilePathFactory.toPath(entry)); } // Add 'DELETE' 'APPEND' file in snapshot @@ -666,10 +674,14 @@ private static Set getSnapshotFileInUse( if (entry.kind() == FileKind.DELETE && entry.file().fileSource().orElse(FileSource.APPEND) == FileSource.APPEND) { - result.add( - new Path( - pathFactory.bucketPath(entry.partition(), entry.bucket()), - entry.file().fileName())); + Pair bucket = Pair.of(entry.partition(), entry.bucket()); + DataFilePathFactory dataFilePathFactory = + dataFilePathFactoryMap.computeIfAbsent( + bucket, + b -> + pathFactory.createDataFilePathFactory( + entry.partition(), entry.bucket())); + result.add(dataFilePathFactory.toPath(entry)); } } } @@ -694,6 +706,7 @@ private static Set getChangelogFileInUse( // changelog file result.add(changelogPath); + Map, DataFilePathFactory> dataFilePathFactoryMap = new HashMap<>(); // data file // not all manifests contains useful data file // (1) produceChangelog = 'true': data file in changelog manifests @@ -716,10 +729,14 @@ private static Set getChangelogFileInUse( .collect(Collectors.toList()); for (ManifestEntry entry : files) { if (entry.file().fileSource().orElse(FileSource.APPEND) == FileSource.APPEND) { - result.add( - new Path( - pathFactory.bucketPath(entry.partition(), entry.bucket()), - entry.file().fileName())); + Pair bucket = Pair.of(entry.partition(), entry.bucket()); + DataFilePathFactory dataFilePathFactory = + dataFilePathFactoryMap.computeIfAbsent( + bucket, + b -> + pathFactory.createDataFilePathFactory( + entry.partition(), entry.bucket())); + result.add(dataFilePathFactory.toPath(entry)); } } } else if (changelog.changelogManifestList() != null) { @@ -731,10 +748,14 @@ private static Set getChangelogFileInUse( .flatMap(m -> manifestFile.read(m.fileName()).stream()) .collect(Collectors.toList()); for (ManifestEntry entry : files) { - result.add( - new Path( - pathFactory.bucketPath(entry.partition(), entry.bucket()), - entry.file().fileName())); + Pair bucket = Pair.of(entry.partition(), entry.bucket()); + DataFilePathFactory dataFilePathFactory = + dataFilePathFactoryMap.computeIfAbsent( + bucket, + b -> + pathFactory.createDataFilePathFactory( + entry.partition(), entry.bucket())); + result.add(dataFilePathFactory.toPath(entry)); } } return result; diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index 5811fc37216b..ed1489ca7eb3 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.operation; import org.apache.paimon.CoreOptions; +import org.apache.paimon.CoreOptions.ExternalPathStrategy; import org.apache.paimon.KeyValue; import org.apache.paimon.Snapshot; import org.apache.paimon.TestFileStore; @@ -29,6 +30,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.manifest.ExpireFileEntry; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.FileSource; @@ -74,6 +76,7 @@ public class ExpireSnapshotsTest { protected final FileIO fileIO = new LocalFileIO(); protected TestKeyValueGenerator gen; @TempDir java.nio.file.Path tempDir; + @TempDir java.nio.file.Path tempExternalPath; protected TestFileStore store; protected SnapshotManager snapshotManager; @@ -134,6 +137,67 @@ public void testExpireWithMissingFiles() throws Exception { assertSnapshot(latestSnapshotId, allData, snapshotPositions); } + @Test + public void testExpireWithMissingFilesWithExternalPath() throws Exception { + String externalPath = "file://" + tempExternalPath.toString(); + store.options().toConfiguration().set(CoreOptions.DATA_FILE_EXTERNAL_PATHS, externalPath); + store.options() + .toConfiguration() + .set( + CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY, + ExternalPathStrategy.ROUND_ROBIN); + ExpireSnapshots expire = store.newExpire(1, 1, 1); + + List allData = new ArrayList<>(); + List snapshotPositions = new ArrayList<>(); + commit(5, allData, snapshotPositions); + + int latestSnapshotId = requireNonNull(snapshotManager.latestSnapshotId()).intValue(); + Set filesInUseTmp = store.getFilesInUse(latestSnapshotId); + + Set filesInUse = + filesInUseTmp.stream() + .map(p -> new Path(Paths.get(p.toUri().getPath()).toString())) + .collect(Collectors.toSet()); + List unusedFileList = + Files.walk(Paths.get(tempDir.toString())) + .filter(Files::isRegularFile) + .filter(p -> !p.getFileName().toString().startsWith("snapshot")) + .filter(p -> !p.getFileName().toString().startsWith("schema")) + .map(p -> new Path(p.toString())) + .filter(p -> !filesInUse.contains(p)) + .collect(Collectors.toList()); + unusedFileList.addAll( + Files.walk(Paths.get(tempExternalPath.toString())) + .filter(Files::isRegularFile) + .filter(p -> !p.getFileName().toString().startsWith("snapshot")) + .filter(p -> !p.getFileName().toString().startsWith("schema")) + .map(p -> new Path(p.toString())) + .filter(p -> !filesInUse.contains(p)) + .collect(Collectors.toList())); + + // shuffle list + ThreadLocalRandom random = ThreadLocalRandom.current(); + for (int i = unusedFileList.size() - 1; i > 0; i--) { + int j = random.nextInt(i + 1); + Collections.swap(unusedFileList, i, j); + } + + // delete some unused files + int numFilesToDelete = random.nextInt(unusedFileList.size()); + for (int i = 0; i < numFilesToDelete; i++) { + fileIO.deleteQuietly(unusedFileList.get(i)); + } + + expire.expire(); + + for (int i = 1; i < latestSnapshotId; i++) { + assertThat(snapshotManager.snapshotExists(i)).isFalse(); + } + assertThat(snapshotManager.snapshotExists(latestSnapshotId)).isTrue(); + assertSnapshot(latestSnapshotId, allData, snapshotPositions); + } + @Test public void testMixedSnapshotAndTagDeletion() throws Exception { List allData = new ArrayList<>(); @@ -233,6 +297,67 @@ public void testExpireExtraFiles() throws IOException { store.assertCleaned(); } + @Test + public void testExpireExtraFilesWithExternalPath() throws IOException { + String externalPath = "file://" + tempExternalPath.toString(); + store.options().toConfiguration().set(CoreOptions.DATA_FILE_EXTERNAL_PATHS, externalPath); + store.options() + .toConfiguration() + .set( + CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY, + ExternalPathStrategy.ROUND_ROBIN); + ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl) store.newExpire(1, 3, Long.MAX_VALUE); + // write test files + BinaryRow partition = gen.getPartition(gen.next()); + + DataFilePathFactory dataFilePathFactory = + store.pathFactory().createDataFilePathFactory(partition, 0); + Path myDataFile = dataFilePathFactory.newPath(); + String fileName = myDataFile.getName(); + new LocalFileIO().tryToWriteAtomic(myDataFile, "1"); + Path extra1 = new Path(myDataFile.getParent(), "extra1"); + fileIO.tryToWriteAtomic(extra1, "2"); + Path extra2 = new Path(myDataFile.getParent(), "extra2"); + fileIO.tryToWriteAtomic(extra2, "3"); + + // create DataFileMeta and ManifestEntry + List extraFiles = Arrays.asList("extra1", "extra2"); + DataFileMeta dataFile = + new DataFileMeta( + fileName, + 1, + 1, + EMPTY_ROW, + EMPTY_ROW, + null, + null, + 0, + 1, + 0, + 0, + extraFiles, + Timestamp.now(), + 0L, + null, + FileSource.APPEND, + null, + myDataFile.toString()); + ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1, dataFile); + ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition, 0, 1, dataFile); + + // expire + expire.snapshotDeletion() + .cleanUnusedDataFile( + Arrays.asList(ExpireFileEntry.from(add), ExpireFileEntry.from(delete))); + + // check + assertThat(fileIO.exists(myDataFile)).isFalse(); + assertThat(fileIO.exists(extra1)).isFalse(); + assertThat(fileIO.exists(extra2)).isFalse(); + + store.assertCleaned(); + } + @Test public void testNoSnapshot() throws IOException { ExpireSnapshots expire = store.newExpire(1, 3, Long.MAX_VALUE); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java index 5139dd44957d..775f8a4d37ad 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.Changelog; import org.apache.paimon.CoreOptions; +import org.apache.paimon.CoreOptions.ExternalPathStrategy; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.DataFormatTestUtil; @@ -89,6 +90,7 @@ public class LocalOrphanFilesCleanTest { private static final Random RANDOM = new Random(System.currentTimeMillis()); @TempDir private java.nio.file.Path tempDir; + @TempDir private java.nio.file.Path tmpExternalPath; private Path tablePath; private FileIO fileIO; private RowType rowType; @@ -96,7 +98,6 @@ public class LocalOrphanFilesCleanTest { private TableWriteImpl write; private TableCommitImpl commit; private Path manifestDir; - private long incrementalIdentifier; private List manuallyAddedFiles; @@ -127,13 +128,136 @@ public void afterEach() throws Exception { TestPojo.reset(); } + @Test + public void testNormallyRemovingWithExternalPath() throws Throwable { + // recreate the table with another option + this.write.close(); + this.commit.close(); + Options options = new Options(); + String externalPaths = "file://" + tmpExternalPath; + options.set(CoreOptions.DATA_FILE_EXTERNAL_PATHS, externalPaths); + options.set( + CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY, ExternalPathStrategy.ROUND_ROBIN); + this.table = createFileStoreTable(rowType, options); + String commitUser = UUID.randomUUID().toString(); + write = table.newWrite(commitUser); + commit = table.newCommit(commitUser); + normallyRemoving(new Path(tmpExternalPath.toString())); + } + @Test public void testNormallyRemoving() throws Throwable { + normallyRemoving(tablePath); + } + + public void normallyRemoving(Path dataPath) throws Throwable { + int commitTimes = 30; + List> committedData = new ArrayList<>(); + Map> snapshotData = new HashMap<>(); + + SnapshotManager snapshotManager = table.snapshotManager(); + writeData(snapshotManager, committedData, snapshotData, new HashMap<>(), commitTimes); + + // randomly create tags + List allTags = new ArrayList<>(); + int snapshotCount = (int) snapshotManager.snapshotCount(); + for (int i = 1; i <= snapshotCount; i++) { + if (RANDOM.nextBoolean()) { + String tagName = "tag" + i; + table.createTag(tagName, i); + allTags.add(tagName); + } + } + + // create branch1 by tag + table.createBranch("branch1", allTags.get(0)); + + // generate non used files + int shouldBeDeleted = generateUnUsedFile(dataPath); + assertThat(manuallyAddedFiles.size()).isEqualTo(shouldBeDeleted); + + // randomly expire snapshots + int expired = RANDOM.nextInt(snapshotCount / 2); + expired = expired == 0 ? 1 : expired; + Options expireOptions = new Options(); + expireOptions.set(CoreOptions.SNAPSHOT_EXPIRE_LIMIT, snapshotCount); + expireOptions.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, snapshotCount - expired); + expireOptions.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, snapshotCount - expired); + table.copy(expireOptions.toMap()).newCommit("").expireSnapshots(); + + // randomly delete tags + List deleteTags = Collections.emptyList(); + deleteTags = randomlyPick(allTags); + for (String tagName : deleteTags) { + table.deleteTag(tagName); + } + + // first check, nothing will be deleted because the default olderThan interval is 1 day + LocalOrphanFilesClean orphanFilesClean = new LocalOrphanFilesClean(table); + assertThat(orphanFilesClean.clean().getDeletedFilesPath().size()).isEqualTo(0); + + // second check + orphanFilesClean = + new LocalOrphanFilesClean( + table, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(2)); + List deleted = orphanFilesClean.clean().getDeletedFilesPath(); + try { + validate(deleted, snapshotData, new HashMap<>()); + } catch (Throwable t) { + String tableOptions = "Table options:\n" + table.options(); + + String committed = "Committed data:"; + for (int i = 0; i < committedData.size(); i++) { + String insertValues = + committedData.get(i).stream() + .map(TestPojo::toInsertValueString) + .collect(Collectors.joining(",")); + committed = String.format("%s\n%d:{%s}", committed, i, insertValues); + } + + String snapshot = "Snapshot expired: " + expired; + + String tag = + String.format( + "Tags: created{%s}; deleted{%s}", + String.join(",", allTags), String.join(",", deleteTags)); + + String addedFile = + "Manually added file:\n" + + manuallyAddedFiles.stream() + .map(Path::toString) + .collect(Collectors.joining("\n")); + + throw new Exception( + String.format( + "%s\n%s\n%s\n%s\n%s", + tableOptions, committed, snapshot, tag, addedFile), + t); + } + } + + @Test + public void testNormallyRemovingMixedWithExternalPath() throws Throwable { int commitTimes = 30; List> committedData = new ArrayList<>(); Map> snapshotData = new HashMap<>(); SnapshotManager snapshotManager = table.snapshotManager(); + // 1. write data to the warehouse path + writeData(snapshotManager, committedData, snapshotData, new HashMap<>(), commitTimes); + + // 2. write data to the external path + this.write.close(); + this.commit.close(); + String externalPaths = "file://" + tmpExternalPath; + table.options().put(CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), externalPaths); + table.options().put(CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(), "round-robin"); + table = table.copy(table.options()); + + String commitUser = UUID.randomUUID().toString(); + write = table.newWrite(commitUser); + commit = table.newCommit(commitUser); + snapshotManager = table.snapshotManager(); writeData(snapshotManager, committedData, snapshotData, new HashMap<>(), commitTimes); // randomly create tags @@ -151,7 +275,8 @@ public void testNormallyRemoving() throws Throwable { table.createBranch("branch1", allTags.get(0)); // generate non used files - int shouldBeDeleted = generateUnUsedFile(); + int shouldBeDeleted = generateUnUsedFile(tablePath); + shouldBeDeleted += generateUnUsedFile(new Path(tmpExternalPath.toString())); assertThat(manuallyAddedFiles.size()).isEqualTo(shouldBeDeleted); // randomly expire snapshots @@ -330,15 +455,34 @@ private void validateSnapshot(Snapshot snapshot, List data) throws Exc @ParameterizedTest(name = "changelog-producer = {0}") public void testCleanOrphanFilesWithChangelogDecoupled(String changelogProducer) throws Exception { + Options options = new Options(); + options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 15); + options.set(CoreOptions.CHANGELOG_NUM_RETAINED_MAX, 20); + options.set(CoreOptions.CHANGELOG_PRODUCER.key(), changelogProducer); + cleanOrphanFilesWithChangelogDecoupled(tablePath, options); + } + + @ValueSource(strings = {"none", "input"}) + @ParameterizedTest(name = "changelog-producer = {0}") + public void testCleanOrphanFilesWithChangelogDecoupledWithExternalPath(String changelogProducer) + throws Exception { + Options options = new Options(); + options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 15); + options.set(CoreOptions.CHANGELOG_NUM_RETAINED_MAX, 20); + options.set(CoreOptions.CHANGELOG_PRODUCER.key(), changelogProducer); + String externalPaths = "file://" + tmpExternalPath; + options.set(CoreOptions.DATA_FILE_EXTERNAL_PATHS, externalPaths); + options.set( + CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY, ExternalPathStrategy.ROUND_ROBIN); + cleanOrphanFilesWithChangelogDecoupled(new Path(tmpExternalPath.toString()), options); + } + + public void cleanOrphanFilesWithChangelogDecoupled(Path dataPath, Options options) + throws Exception { // recreate the table with another option this.write.close(); this.commit.close(); int commitTimes = 30; - Options options = new Options(); - options.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT); - options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 15); - options.set(CoreOptions.CHANGELOG_NUM_RETAINED_MAX, 20); - options.setString(CoreOptions.CHANGELOG_PRODUCER.key(), changelogProducer); FileStoreTable table = createFileStoreTable(rowType, options); String commitUser = UUID.randomUUID().toString(); this.table = table; @@ -356,7 +500,7 @@ public void testCleanOrphanFilesWithChangelogDecoupled(String changelogProducer) table.createBranch("branch1"); // generate non used files - int shouldBeDeleted = generateUnUsedFile(); + int shouldBeDeleted = generateUnUsedFile(dataPath); assertThat(manuallyAddedFiles.size()).isEqualTo(shouldBeDeleted); // first check, nothing will be deleted because the default olderThan interval is 1 day @@ -408,11 +552,17 @@ private void writeData( int commitTimes) throws Exception { // first snapshot + Long latestSnapshotId = snapshotManager.latestSnapshotId(); List data = generateData(); commit(data); committedData.add(data); - recordSnapshotData(data, snapshotData, snapshotManager); - recordChangelogData(new ArrayList<>(), data, changelogData, snapshotManager); + List current = new ArrayList<>(); + if (latestSnapshotId != null) { + current.addAll(snapshotData.get(latestSnapshotId)); + } + current.addAll(data); + recordSnapshotData(current, snapshotData, snapshotManager); + recordChangelogData(new ArrayList<>(), current, changelogData, snapshotManager); // randomly generate data for (int i = 1; i <= commitTimes; i++) { @@ -429,7 +579,7 @@ private void writeData( recordSnapshotData(previous, snapshotData, snapshotManager); recordChangelogData(toBeUpdated, updateAfter, changelogData, snapshotManager); } else { - List current = generateData(); + current = generateData(); commit(current); committedData.add(current); @@ -440,7 +590,7 @@ private void writeData( } } - private int generateUnUsedFile() throws Exception { + private int generateUnUsedFile(Path dataPath) throws Exception { int shouldBeDeleted = 0; int fileNum = RANDOM.nextInt(10); fileNum = fileNum == 0 ? 1 : fileNum; @@ -458,7 +608,7 @@ private int generateUnUsedFile() throws Exception { shouldBeDeleted += fileNum; // data files - shouldBeDeleted += randomlyAddNonUsedDataFiles(); + shouldBeDeleted += randomlyAddNonUsedDataFiles(dataPath); // manifests addNonUsedFiles( @@ -544,9 +694,9 @@ private void recordChangelogData( } } - private int randomlyAddNonUsedDataFiles() throws IOException { + private int randomlyAddNonUsedDataFiles(Path dataPath) throws IOException { int addedFiles = 0; - List part1 = listSubDirs(tablePath, p -> p.getName().contains("=")); + List part1 = listSubDirs(dataPath, p -> p.getName().contains("=")); List part2 = new ArrayList<>(); List buckets = new ArrayList<>(); for (Path path : part1) { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java index 23f2f0261d68..c46b1bb445e8 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java @@ -23,7 +23,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import java.nio.file.Path; import java.time.LocalDateTime; import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv; @@ -33,6 +35,8 @@ /** IT cases for {@link ExpireTagsAction}. */ public class ExpireTagsActionTest extends ActionITCaseBase { + @TempDir private Path tempExternalPath; + @BeforeEach public void setUp() { init(warehouse); @@ -45,6 +49,26 @@ public void testExpireTags() throws Exception { + " PRIMARY KEY (id) NOT ENFORCED)" + " WITH ('bucket'='1', 'write-only'='true')"); + expireTags(); + } + + @Test + public void testExpireTagsWithExternalPath() throws Exception { + String externalPath = "file://" + tempExternalPath; + bEnv.executeSql( + "CREATE TABLE T (id STRING, name STRING," + + " PRIMARY KEY (id) NOT ENFORCED)" + + " WITH (" + + "'data-file.external-paths'='" + + externalPath + + "'," + + "'data-file.external-paths.strategy' = 'round-robin'," + + "'write-only'='true')"); + + expireTags(); + } + + public void expireTags() throws Exception { FileStoreTable table = getFileStoreTable("T"); // generate 5 snapshots