From 90feb091695fc7fc06e28281abc56df340888fc6 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Mon, 13 Jan 2025 17:32:09 +0800 Subject: [PATCH 1/5] refactor the expire function --- .../AbstractIcebergCommitCallback.java | 14 +- .../apache/paimon/io/DataFilePathFactory.java | 8 ++ .../paimon/operation/FileDeletionBase.java | 27 +++- .../apache/paimon/operation/TagDeletion.java | 15 +- .../java/org/apache/paimon/TestFileStore.java | 53 ++++--- .../paimon/operation/ExpireSnapshotsTest.java | 133 ++++++++++++++++++ paimon-flink/paimon-flink-common/pom.xml | 6 + .../flink/action/ExpireTagsActionTest.java | 29 ++++ 8 files changed, 255 insertions(+), 30 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java index f561546e8bb3..9301dab3eba2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/AbstractIcebergCommitCallback.java @@ -41,6 +41,7 @@ import org.apache.paimon.iceberg.metadata.IcebergSnapshot; import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.options.Options; @@ -446,11 +447,16 @@ private boolean collectFileChanges( Map removedFiles, Map> addedFiles) { boolean isAddOnly = true; + Map, DataFilePathFactory> dataFilePathFactoryMap = new HashMap<>(); for (ManifestEntry entry : manifestEntries) { - String path = - fileStorePathFactory.bucketPath(entry.partition(), entry.bucket()) - + "/" - + entry.fileName(); + Pair bucket = Pair.of(entry.partition(), entry.bucket()); + DataFilePathFactory dataFilePathFactory = + dataFilePathFactoryMap.computeIfAbsent( + bucket, + b -> + fileStorePathFactory.createDataFilePathFactory( + entry.partition(), entry.bucket())); + String path = dataFilePathFactory.toPath(entry).toString(); switch (entry.kind()) { case ADD: if (shouldAddFileToIceberg(entry.file())) { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java index a70a795ef02d..eca6258e2eee 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java @@ -110,6 +110,14 @@ public Path toAlignedPath(String fileName, DataFileMeta aligned) { return new Path(aligned.externalPathDir().map(Path::new).orElse(parent), fileName); } + public Path toAlignedPath(String fileName, FileEntry aligned) { + Optional externalPathDir = + Optional.ofNullable(aligned.externalPath()) + .map(Path::new) + .map(p -> p.getParent().toUri().toString()); + return new Path(externalPathDir.map(Path::new).orElse(parent), fileName); + } + public static Path dataFileToFileIndexPath(Path dataFilePath) { return new Path(dataFilePath.getParent(), dataFilePath.getName() + INDEX_PATH_SUFFIX); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java index cfecd767b6fb..1a029245078f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java @@ -24,6 +24,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.manifest.ExpireFileEntry; import org.apache.paimon.manifest.FileEntry; import org.apache.paimon.manifest.FileEntry.Identifier; @@ -216,9 +217,16 @@ protected void getDataFileToDelete( List dataFileEntries) { // we cannot delete a data file directly when we meet a DELETE entry, because that // file might be upgraded + Map, DataFilePathFactory> dataFilePathFactoryMap = new HashMap<>(); for (ExpireFileEntry entry : dataFileEntries) { - Path bucketPath = pathFactory.bucketPath(entry.partition(), entry.bucket()); - Path dataFilePath = new Path(bucketPath, entry.fileName()); + Pair bucket = Pair.of(entry.partition(), entry.bucket()); + DataFilePathFactory dataFilePathFactory = + dataFilePathFactoryMap.computeIfAbsent( + bucket, + b -> + pathFactory.createDataFilePathFactory( + entry.partition(), entry.bucket())); + Path dataFilePath = dataFilePathFactory.toPath(entry); switch (entry.kind()) { case ADD: dataFileToDelete.remove(dataFilePath); @@ -226,7 +234,7 @@ protected void getDataFileToDelete( case DELETE: List extraFiles = new ArrayList<>(entry.extraFiles().size()); for (String file : entry.extraFiles()) { - extraFiles.add(new Path(bucketPath, file)); + extraFiles.add(dataFilePathFactory.toAlignedPath(file, entry)); } dataFileToDelete.put(dataFilePath, Pair.of(entry, extraFiles)); break; @@ -259,12 +267,17 @@ public void deleteAddedDataFiles(String manifestListName) { private void deleteAddedDataFiles(List manifestEntries) { List dataFileToDelete = new ArrayList<>(); + Map, DataFilePathFactory> dataFilePathFactoryMap = new HashMap<>(); for (ExpireFileEntry entry : manifestEntries) { + Pair bucket = Pair.of(entry.partition(), entry.bucket()); + DataFilePathFactory dataFilePathFactory = + dataFilePathFactoryMap.computeIfAbsent( + bucket, + b -> + pathFactory.createDataFilePathFactory( + entry.partition(), entry.bucket())); if (entry.kind() == FileKind.ADD) { - dataFileToDelete.add( - new Path( - pathFactory.bucketPath(entry.partition(), entry.bucket()), - entry.fileName())); + dataFileToDelete.add(dataFilePathFactory.toPath(entry)); recordDeletionBuckets(entry); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java index 2722ed0c7ec8..532b902c185b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java @@ -23,11 +23,13 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; +import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.manifest.ExpireFileEntry; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.stats.StatsFileHandler; import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -78,12 +80,19 @@ public void cleanUnusedDataFiles(Snapshot taggedSnapshot, Predicate dataFileToDelete = new HashSet<>(); + Map, DataFilePathFactory> dataFilePathFactoryMap = new HashMap<>(); for (ExpireFileEntry entry : manifestEntries) { + Pair bucket = Pair.of(entry.partition(), entry.bucket()); + DataFilePathFactory dataFilePathFactory = + dataFilePathFactoryMap.computeIfAbsent( + bucket, + b -> + pathFactory.createDataFilePathFactory( + entry.partition(), entry.bucket())); if (!skipper.test(entry)) { - Path bucketPath = pathFactory.bucketPath(entry.partition(), entry.bucket()); - dataFileToDelete.add(new Path(bucketPath, entry.fileName())); + dataFileToDelete.add(dataFilePathFactory.toPath(entry)); for (String file : entry.extraFiles()) { - dataFileToDelete.add(new Path(bucketPath, file)); + dataFileToDelete.add(dataFilePathFactory.toAlignedPath(file, entry)); } recordDeletionBuckets(entry); diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index 0d8ea5f4a49a..48fa81c525d8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -25,6 +25,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.io.IndexIncrement; import org.apache.paimon.manifest.FileEntry; import org.apache.paimon.manifest.FileKind; @@ -58,6 +59,7 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.CommitIncrement; import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.RecordWriter; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -643,11 +645,17 @@ private static Set getSnapshotFileInUse( .flatMap(m -> manifestFile.read(m.fileName()).stream()) .collect(Collectors.toList()); entries = new ArrayList<>(FileEntry.mergeEntries(entries)); + Map, DataFilePathFactory> dataFilePathFactoryMap = new HashMap<>(); + for (ManifestEntry entry : entries) { - result.add( - new Path( - pathFactory.bucketPath(entry.partition(), entry.bucket()), - entry.file().fileName())); + Pair bucket = Pair.of(entry.partition(), entry.bucket()); + DataFilePathFactory dataFilePathFactory = + dataFilePathFactoryMap.computeIfAbsent( + bucket, + b -> + pathFactory.createDataFilePathFactory( + entry.partition(), entry.bucket())); + result.add(dataFilePathFactory.toPath(entry)); } // Add 'DELETE' 'APPEND' file in snapshot @@ -666,10 +674,14 @@ private static Set getSnapshotFileInUse( if (entry.kind() == FileKind.DELETE && entry.file().fileSource().orElse(FileSource.APPEND) == FileSource.APPEND) { - result.add( - new Path( - pathFactory.bucketPath(entry.partition(), entry.bucket()), - entry.file().fileName())); + Pair bucket = Pair.of(entry.partition(), entry.bucket()); + DataFilePathFactory dataFilePathFactory = + dataFilePathFactoryMap.computeIfAbsent( + bucket, + b -> + pathFactory.createDataFilePathFactory( + entry.partition(), entry.bucket())); + result.add(dataFilePathFactory.toPath(entry)); } } } @@ -694,6 +706,7 @@ private static Set getChangelogFileInUse( // changelog file result.add(changelogPath); + Map, DataFilePathFactory> dataFilePathFactoryMap = new HashMap<>(); // data file // not all manifests contains useful data file // (1) produceChangelog = 'true': data file in changelog manifests @@ -716,10 +729,14 @@ private static Set getChangelogFileInUse( .collect(Collectors.toList()); for (ManifestEntry entry : files) { if (entry.file().fileSource().orElse(FileSource.APPEND) == FileSource.APPEND) { - result.add( - new Path( - pathFactory.bucketPath(entry.partition(), entry.bucket()), - entry.file().fileName())); + Pair bucket = Pair.of(entry.partition(), entry.bucket()); + DataFilePathFactory dataFilePathFactory = + dataFilePathFactoryMap.computeIfAbsent( + bucket, + b -> + pathFactory.createDataFilePathFactory( + entry.partition(), entry.bucket())); + result.add(dataFilePathFactory.toPath(entry)); } } } else if (changelog.changelogManifestList() != null) { @@ -731,10 +748,14 @@ private static Set getChangelogFileInUse( .flatMap(m -> manifestFile.read(m.fileName()).stream()) .collect(Collectors.toList()); for (ManifestEntry entry : files) { - result.add( - new Path( - pathFactory.bucketPath(entry.partition(), entry.bucket()), - entry.file().fileName())); + Pair bucket = Pair.of(entry.partition(), entry.bucket()); + DataFilePathFactory dataFilePathFactory = + dataFilePathFactoryMap.computeIfAbsent( + bucket, + b -> + pathFactory.createDataFilePathFactory( + entry.partition(), entry.bucket())); + result.add(dataFilePathFactory.toPath(entry)); } } return result; diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index 5811fc37216b..2de6a01e2185 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.operation; import org.apache.paimon.CoreOptions; +import org.apache.paimon.CoreOptions.ExternalPathStrategy; import org.apache.paimon.KeyValue; import org.apache.paimon.Snapshot; import org.apache.paimon.TestFileStore; @@ -29,6 +30,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.manifest.ExpireFileEntry; import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.FileSource; @@ -74,6 +76,7 @@ public class ExpireSnapshotsTest { protected final FileIO fileIO = new LocalFileIO(); protected TestKeyValueGenerator gen; @TempDir java.nio.file.Path tempDir; + @TempDir java.nio.file.Path tempExternalPath; protected TestFileStore store; protected SnapshotManager snapshotManager; @@ -112,6 +115,75 @@ public void testExpireWithMissingFiles() throws Exception { .filter(p -> !filesInUse.contains(p)) .collect(Collectors.toList()); + for (int i = 0; i < unusedFileList.size(); i++) { + System.out.println("qihouliang111= " + unusedFileList.get(i)); + } + + for (Path path : filesInUse) { + System.out.println("qihouliang222= " + path); + } + + // shuffle list + ThreadLocalRandom random = ThreadLocalRandom.current(); + for (int i = unusedFileList.size() - 1; i > 0; i--) { + int j = random.nextInt(i + 1); + Collections.swap(unusedFileList, i, j); + } + + // delete some unused files + int numFilesToDelete = random.nextInt(unusedFileList.size()); + for (int i = 0; i < numFilesToDelete; i++) { + fileIO.deleteQuietly(unusedFileList.get(i)); + } + + expire.expire(); + + for (int i = 1; i < latestSnapshotId; i++) { + assertThat(snapshotManager.snapshotExists(i)).isFalse(); + } + assertThat(snapshotManager.snapshotExists(latestSnapshotId)).isTrue(); + assertSnapshot(latestSnapshotId, allData, snapshotPositions); + } + + @Test + public void testExpireWithMissingFilesWithExternalPath() throws Exception { + String externalPath = "file://" + tempExternalPath.toString(); + store.options().toConfiguration().set(CoreOptions.DATA_FILE_EXTERNAL_PATHS, externalPath); + store.options() + .toConfiguration() + .set( + CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY, + ExternalPathStrategy.ROUND_ROBIN); + ExpireSnapshots expire = store.newExpire(1, 1, 1); + + List allData = new ArrayList<>(); + List snapshotPositions = new ArrayList<>(); + commit(5, allData, snapshotPositions); + + int latestSnapshotId = requireNonNull(snapshotManager.latestSnapshotId()).intValue(); + Set filesInUseTmp = store.getFilesInUse(latestSnapshotId); + + Set filesInUse = + filesInUseTmp.stream() + .map(p -> new Path(Paths.get(p.toUri().getPath()).toString())) + .collect(Collectors.toSet()); + List unusedFileList = + Files.walk(Paths.get(tempDir.toString())) + .filter(Files::isRegularFile) + .filter(p -> !p.getFileName().toString().startsWith("snapshot")) + .filter(p -> !p.getFileName().toString().startsWith("schema")) + .map(p -> new Path(p.toString())) + .filter(p -> !filesInUse.contains(p)) + .collect(Collectors.toList()); + unusedFileList.addAll( + Files.walk(Paths.get(tempExternalPath.toString())) + .filter(Files::isRegularFile) + .filter(p -> !p.getFileName().toString().startsWith("snapshot")) + .filter(p -> !p.getFileName().toString().startsWith("schema")) + .map(p -> new Path(p.toString())) + .filter(p -> !filesInUse.contains(p)) + .collect(Collectors.toList())); + // shuffle list ThreadLocalRandom random = ThreadLocalRandom.current(); for (int i = unusedFileList.size() - 1; i > 0; i--) { @@ -233,6 +305,67 @@ public void testExpireExtraFiles() throws IOException { store.assertCleaned(); } + @Test + public void testExpireExtraFilesWithExternalPath() throws IOException { + String externalPath = "file://" + tempExternalPath.toString(); + store.options().toConfiguration().set(CoreOptions.DATA_FILE_EXTERNAL_PATHS, externalPath); + store.options() + .toConfiguration() + .set( + CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY, + ExternalPathStrategy.ROUND_ROBIN); + ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl) store.newExpire(1, 3, Long.MAX_VALUE); + // write test files + BinaryRow partition = gen.getPartition(gen.next()); + + DataFilePathFactory dataFilePathFactory = + store.pathFactory().createDataFilePathFactory(partition, 0); + Path myDataFile = dataFilePathFactory.newPath(); + String fileName = myDataFile.getName(); + new LocalFileIO().tryToWriteAtomic(myDataFile, "1"); + Path extra1 = new Path(myDataFile.getParent(), "extra1"); + fileIO.tryToWriteAtomic(extra1, "2"); + Path extra2 = new Path(myDataFile.getParent(), "extra2"); + fileIO.tryToWriteAtomic(extra2, "3"); + + // create DataFileMeta and ManifestEntry + List extraFiles = Arrays.asList("extra1", "extra2"); + DataFileMeta dataFile = + new DataFileMeta( + fileName, + 1, + 1, + EMPTY_ROW, + EMPTY_ROW, + null, + null, + 0, + 1, + 0, + 0, + extraFiles, + Timestamp.now(), + 0L, + null, + FileSource.APPEND, + null, + myDataFile.toString()); + ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1, dataFile); + ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition, 0, 1, dataFile); + + // expire + expire.snapshotDeletion() + .cleanUnusedDataFile( + Arrays.asList(ExpireFileEntry.from(add), ExpireFileEntry.from(delete))); + + // check + assertThat(fileIO.exists(myDataFile)).isFalse(); + assertThat(fileIO.exists(extra1)).isFalse(); + assertThat(fileIO.exists(extra2)).isFalse(); + + store.assertCleaned(); + } + @Test public void testNoSnapshot() throws IOException { ExpireSnapshots expire = store.newExpire(1, 3, Long.MAX_VALUE); diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml index 84d4622b02b8..ef61000f6a19 100644 --- a/paimon-flink/paimon-flink-common/pom.xml +++ b/paimon-flink/paimon-flink-common/pom.xml @@ -184,6 +184,12 @@ under the License. ${okhttp.version} test + + org.apache.flink + flink-core + 1.18.1 + test + diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java index 23f2f0261d68..a520f7c793c9 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java @@ -23,7 +23,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import java.nio.file.Path; import java.time.LocalDateTime; import static org.apache.paimon.flink.util.ReadWriteTableTestUtil.bEnv; @@ -33,6 +35,8 @@ /** IT cases for {@link ExpireTagsAction}. */ public class ExpireTagsActionTest extends ActionITCaseBase { + @TempDir private Path tempExternalPath; + @BeforeEach public void setUp() { init(warehouse); @@ -45,6 +49,31 @@ public void testExpireTags() throws Exception { + " PRIMARY KEY (id) NOT ENFORCED)" + " WITH ('bucket'='1', 'write-only'='true')"); + expireTags(); + } + + @Test + public void testExpireTagsWithExternalPath() throws Exception { + String externalPath = "file://" + tempExternalPath; + bEnv.executeSql( + "CREATE TABLE T (id STRING, name STRING," + + " PRIMARY KEY (id) NOT ENFORCED)" + + " WITH (" + + "'data-file.external-paths'='" + + externalPath + + "'," + + "'data-file.external-paths.strategy' = 'round-robin'," + + "'write-only'='true')"); + + expireTags(); + } + + public void expireTags() throws Exception { + bEnv.executeSql( + "CREATE TABLE T (id STRING, name STRING," + + " PRIMARY KEY (id) NOT ENFORCED)" + + " WITH ('bucket'='1', 'write-only'='true')"); + FileStoreTable table = getFileStoreTable("T"); // generate 5 snapshots From dff5dcb3703e4e7956901408d6e0913f1789ddb9 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Tue, 14 Jan 2025 10:36:27 +0800 Subject: [PATCH 2/5] remove useless codes --- paimon-flink/paimon-flink-common/pom.xml | 6 ------ .../apache/paimon/flink/action/ExpireTagsActionTest.java | 5 ----- 2 files changed, 11 deletions(-) diff --git a/paimon-flink/paimon-flink-common/pom.xml b/paimon-flink/paimon-flink-common/pom.xml index ef61000f6a19..84d4622b02b8 100644 --- a/paimon-flink/paimon-flink-common/pom.xml +++ b/paimon-flink/paimon-flink-common/pom.xml @@ -184,12 +184,6 @@ under the License. ${okhttp.version} test - - org.apache.flink - flink-core - 1.18.1 - test - diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java index a520f7c793c9..c46b1bb445e8 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/ExpireTagsActionTest.java @@ -69,11 +69,6 @@ public void testExpireTagsWithExternalPath() throws Exception { } public void expireTags() throws Exception { - bEnv.executeSql( - "CREATE TABLE T (id STRING, name STRING," - + " PRIMARY KEY (id) NOT ENFORCED)" - + " WITH ('bucket'='1', 'write-only'='true')"); - FileStoreTable table = getFileStoreTable("T"); // generate 5 snapshots From abd6cc96bbb95fa287092261067c0d9a25fe0d86 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Tue, 14 Jan 2025 21:26:20 +0800 Subject: [PATCH 3/5] refactor the orphan clean --- .../operation/LocalOrphanFilesClean.java | 2 +- .../paimon/operation/OrphanFilesClean.java | 8 +++ .../operation/LocalOrphanFilesCleanTest.java | 67 +++++++++++++++---- 3 files changed, 63 insertions(+), 14 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index 6a4276662468..c64b866929a5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -97,7 +97,7 @@ public CleanOrphanFilesResult clean() // specially handle to clear snapshot dir cleanSnapshotDir(branches, deleteFiles::add, deletedFilesLenInBytes::addAndGet); - // delete candidate files + // get candidate files Map> candidates = getCandidateDeletingFiles(); if (candidates.isEmpty()) { return new CleanOrphanFilesResult( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 54e082091840..2c669c3ea9e0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -262,6 +262,14 @@ protected List listPaimonFileDirs() { paimonFileDirs.add(pathFactory.statisticsPath()); paimonFileDirs.addAll(listFileDirs(pathFactory.dataFilePath(), partitionKeysNum)); + // add external data paths + String dataFileExternalPaths = table.store().options().dataFileExternalPaths(); + if (dataFileExternalPaths != null) { + String[] externalPathArr = dataFileExternalPaths.split(","); + for (String externalPath : externalPathArr) { + paimonFileDirs.addAll(listFileDirs(new Path(externalPath), partitionKeysNum)); + } + } return paimonFileDirs; } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java index 5139dd44957d..cda026db02bf 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.Changelog; import org.apache.paimon.CoreOptions; +import org.apache.paimon.CoreOptions.ExternalPathStrategy; import org.apache.paimon.Snapshot; import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.DataFormatTestUtil; @@ -89,14 +90,14 @@ public class LocalOrphanFilesCleanTest { private static final Random RANDOM = new Random(System.currentTimeMillis()); @TempDir private java.nio.file.Path tempDir; - private Path tablePath; + private static Path tablePath; private FileIO fileIO; private RowType rowType; private FileStoreTable table; private TableWriteImpl write; private TableCommitImpl commit; private Path manifestDir; - + @TempDir private java.nio.file.Path tmpExternalPath; private long incrementalIdentifier; private List manuallyAddedFiles; @@ -127,8 +128,29 @@ public void afterEach() throws Exception { TestPojo.reset(); } + @Test + public void testNormallyRemovingWithExternalPath() throws Throwable { + // recreate the table with another option + this.write.close(); + this.commit.close(); + Options options = new Options(); + String externalPaths = "file://" + tmpExternalPath; + options.set(CoreOptions.DATA_FILE_EXTERNAL_PATHS, externalPaths); + options.set( + CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY, ExternalPathStrategy.ROUND_ROBIN); + this.table = createFileStoreTable(rowType, options); + String commitUser = UUID.randomUUID().toString(); + write = table.newWrite(commitUser); + commit = table.newCommit(commitUser); + normallyRemoving(new Path(tmpExternalPath.toString())); + } + @Test public void testNormallyRemoving() throws Throwable { + normallyRemoving(tablePath); + } + + public void normallyRemoving(Path dataPath) throws Throwable { int commitTimes = 30; List> committedData = new ArrayList<>(); Map> snapshotData = new HashMap<>(); @@ -151,7 +173,7 @@ public void testNormallyRemoving() throws Throwable { table.createBranch("branch1", allTags.get(0)); // generate non used files - int shouldBeDeleted = generateUnUsedFile(); + int shouldBeDeleted = generateUnUsedFile(dataPath); assertThat(manuallyAddedFiles.size()).isEqualTo(shouldBeDeleted); // randomly expire snapshots @@ -330,15 +352,34 @@ private void validateSnapshot(Snapshot snapshot, List data) throws Exc @ParameterizedTest(name = "changelog-producer = {0}") public void testCleanOrphanFilesWithChangelogDecoupled(String changelogProducer) throws Exception { + Options options = new Options(); + options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 15); + options.set(CoreOptions.CHANGELOG_NUM_RETAINED_MAX, 20); + options.set(CoreOptions.CHANGELOG_PRODUCER.key(), changelogProducer); + cleanOrphanFilesWithChangelogDecoupled(tablePath, options); + } + + @ValueSource(strings = {"none", "input"}) + @ParameterizedTest(name = "changelog-producer = {0}") + public void testCleanOrphanFilesWithChangelogDecoupledWithExternalPath(String changelogProducer) + throws Exception { + Options options = new Options(); + options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 15); + options.set(CoreOptions.CHANGELOG_NUM_RETAINED_MAX, 20); + options.set(CoreOptions.CHANGELOG_PRODUCER.key(), changelogProducer); + String externalPaths = "file://" + tmpExternalPath; + options.set(CoreOptions.DATA_FILE_EXTERNAL_PATHS, externalPaths); + options.set( + CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY, ExternalPathStrategy.ROUND_ROBIN); + cleanOrphanFilesWithChangelogDecoupled(new Path(tmpExternalPath.toString()), options); + } + + public void cleanOrphanFilesWithChangelogDecoupled(Path dataPath, Options options) + throws Exception { // recreate the table with another option this.write.close(); this.commit.close(); int commitTimes = 30; - Options options = new Options(); - options.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT); - options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 15); - options.set(CoreOptions.CHANGELOG_NUM_RETAINED_MAX, 20); - options.setString(CoreOptions.CHANGELOG_PRODUCER.key(), changelogProducer); FileStoreTable table = createFileStoreTable(rowType, options); String commitUser = UUID.randomUUID().toString(); this.table = table; @@ -356,7 +397,7 @@ public void testCleanOrphanFilesWithChangelogDecoupled(String changelogProducer) table.createBranch("branch1"); // generate non used files - int shouldBeDeleted = generateUnUsedFile(); + int shouldBeDeleted = generateUnUsedFile(dataPath); assertThat(manuallyAddedFiles.size()).isEqualTo(shouldBeDeleted); // first check, nothing will be deleted because the default olderThan interval is 1 day @@ -440,7 +481,7 @@ private void writeData( } } - private int generateUnUsedFile() throws Exception { + private int generateUnUsedFile(Path dataPath) throws Exception { int shouldBeDeleted = 0; int fileNum = RANDOM.nextInt(10); fileNum = fileNum == 0 ? 1 : fileNum; @@ -458,7 +499,7 @@ private int generateUnUsedFile() throws Exception { shouldBeDeleted += fileNum; // data files - shouldBeDeleted += randomlyAddNonUsedDataFiles(); + shouldBeDeleted += randomlyAddNonUsedDataFiles(dataPath); // manifests addNonUsedFiles( @@ -544,9 +585,9 @@ private void recordChangelogData( } } - private int randomlyAddNonUsedDataFiles() throws IOException { + private int randomlyAddNonUsedDataFiles(Path dataPath) throws IOException { int addedFiles = 0; - List part1 = listSubDirs(tablePath, p -> p.getName().contains("=")); + List part1 = listSubDirs(dataPath, p -> p.getName().contains("=")); List part2 = new ArrayList<>(); List buckets = new ArrayList<>(); for (Path path : part1) { From 1f03cb52126840c943dc82030b435dd07c83fb42 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Wed, 15 Jan 2025 17:31:08 +0800 Subject: [PATCH 4/5] add orphan clean ut that contains warehouse path data and external path data --- .../operation/LocalOrphanFilesCleanTest.java | 115 +++++++++++++++++- 1 file changed, 112 insertions(+), 3 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java index cda026db02bf..d2804b48590d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java @@ -236,6 +236,109 @@ public void normallyRemoving(Path dataPath) throws Throwable { } } + @Test + public void testNormallyRemovingMixedWithExternalPath() throws Throwable { + int commitTimes = 30; + List> committedData = new ArrayList<>(); + Map> snapshotData = new HashMap<>(); + + SnapshotManager snapshotManager = table.snapshotManager(); + // 1. write data to the warehouse path + writeData(snapshotManager, committedData, snapshotData, new HashMap<>(), commitTimes); + + // 2. write data to the external path + this.write.close(); + this.commit.close(); + String externalPaths = "file://" + tmpExternalPath; + table.options().put(CoreOptions.DATA_FILE_EXTERNAL_PATHS.key(), externalPaths); + table.options().put(CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY.key(), "round-robin"); + table = table.copy(table.options()); + + String commitUser = UUID.randomUUID().toString(); + write = table.newWrite(commitUser); + commit = table.newCommit(commitUser); + snapshotManager = table.snapshotManager(); + writeData(snapshotManager, committedData, snapshotData, new HashMap<>(), commitTimes); + + // randomly create tags + List allTags = new ArrayList<>(); + int snapshotCount = (int) snapshotManager.snapshotCount(); + for (int i = 1; i <= snapshotCount; i++) { + if (RANDOM.nextBoolean()) { + String tagName = "tag" + i; + table.createTag(tagName, i); + allTags.add(tagName); + } + } + + // create branch1 by tag + table.createBranch("branch1", allTags.get(0)); + + // generate non used files + int shouldBeDeleted = generateUnUsedFile(tablePath); + shouldBeDeleted += generateUnUsedFile(new Path(tmpExternalPath.toString())); + assertThat(manuallyAddedFiles.size()).isEqualTo(shouldBeDeleted); + + // randomly expire snapshots + int expired = RANDOM.nextInt(snapshotCount / 2); + expired = expired == 0 ? 1 : expired; + Options expireOptions = new Options(); + expireOptions.set(CoreOptions.SNAPSHOT_EXPIRE_LIMIT, snapshotCount); + expireOptions.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, snapshotCount - expired); + expireOptions.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, snapshotCount - expired); + table.copy(expireOptions.toMap()).newCommit("").expireSnapshots(); + + // randomly delete tags + List deleteTags = Collections.emptyList(); + deleteTags = randomlyPick(allTags); + for (String tagName : deleteTags) { + table.deleteTag(tagName); + } + + // first check, nothing will be deleted because the default olderThan interval is 1 day + LocalOrphanFilesClean orphanFilesClean = new LocalOrphanFilesClean(table); + assertThat(orphanFilesClean.clean().getDeletedFilesPath().size()).isEqualTo(0); + + // second check + orphanFilesClean = + new LocalOrphanFilesClean( + table, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(2)); + List deleted = orphanFilesClean.clean().getDeletedFilesPath(); + try { + validate(deleted, snapshotData, new HashMap<>()); + } catch (Throwable t) { + String tableOptions = "Table options:\n" + table.options(); + + String committed = "Committed data:"; + for (int i = 0; i < committedData.size(); i++) { + String insertValues = + committedData.get(i).stream() + .map(TestPojo::toInsertValueString) + .collect(Collectors.joining(",")); + committed = String.format("%s\n%d:{%s}", committed, i, insertValues); + } + + String snapshot = "Snapshot expired: " + expired; + + String tag = + String.format( + "Tags: created{%s}; deleted{%s}", + String.join(",", allTags), String.join(",", deleteTags)); + + String addedFile = + "Manually added file:\n" + + manuallyAddedFiles.stream() + .map(Path::toString) + .collect(Collectors.joining("\n")); + + throw new Exception( + String.format( + "%s\n%s\n%s\n%s\n%s", + tableOptions, committed, snapshot, tag, addedFile), + t); + } + } + private void validate( List deleteFiles, Map> snapshotData, @@ -449,11 +552,17 @@ private void writeData( int commitTimes) throws Exception { // first snapshot + Long latestSnapshotId = snapshotManager.latestSnapshotId(); List data = generateData(); commit(data); committedData.add(data); - recordSnapshotData(data, snapshotData, snapshotManager); - recordChangelogData(new ArrayList<>(), data, changelogData, snapshotManager); + List current = new ArrayList<>(); + if (latestSnapshotId != null) { + current.addAll(snapshotData.get(latestSnapshotId)); + } + current.addAll(data); + recordSnapshotData(current, snapshotData, snapshotManager); + recordChangelogData(new ArrayList<>(), current, changelogData, snapshotManager); // randomly generate data for (int i = 1; i <= commitTimes; i++) { @@ -470,7 +579,7 @@ private void writeData( recordSnapshotData(previous, snapshotData, snapshotManager); recordChangelogData(toBeUpdated, updateAfter, changelogData, snapshotManager); } else { - List current = generateData(); + current = generateData(); commit(current); committedData.add(current); From 706d0cf7c19f99cdad3ca097c13a30b524808f5c Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Thu, 16 Jan 2025 11:49:49 +0800 Subject: [PATCH 5/5] remove useless codes --- .../org/apache/paimon/operation/ExpireSnapshotsTest.java | 8 -------- .../paimon/operation/LocalOrphanFilesCleanTest.java | 4 ++-- 2 files changed, 2 insertions(+), 10 deletions(-) diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index 2de6a01e2185..ed1489ca7eb3 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -115,14 +115,6 @@ public void testExpireWithMissingFiles() throws Exception { .filter(p -> !filesInUse.contains(p)) .collect(Collectors.toList()); - for (int i = 0; i < unusedFileList.size(); i++) { - System.out.println("qihouliang111= " + unusedFileList.get(i)); - } - - for (Path path : filesInUse) { - System.out.println("qihouliang222= " + path); - } - // shuffle list ThreadLocalRandom random = ThreadLocalRandom.current(); for (int i = unusedFileList.size() - 1; i > 0; i--) { diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java index d2804b48590d..775f8a4d37ad 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java @@ -90,14 +90,14 @@ public class LocalOrphanFilesCleanTest { private static final Random RANDOM = new Random(System.currentTimeMillis()); @TempDir private java.nio.file.Path tempDir; - private static Path tablePath; + @TempDir private java.nio.file.Path tmpExternalPath; + private Path tablePath; private FileIO fileIO; private RowType rowType; private FileStoreTable table; private TableWriteImpl write; private TableCommitImpl commit; private Path manifestDir; - @TempDir private java.nio.file.Path tmpExternalPath; private long incrementalIdentifier; private List manuallyAddedFiles;