From a352c1504dc01810bafc6f6664ed0a2197f8d921 Mon Sep 17 00:00:00 2001 From: Askwang <135721692+Askwang@users.noreply.github.com> Date: Fri, 3 Jan 2025 10:48:20 +0800 Subject: [PATCH 1/9] clean empty directory --- .../operation/LocalOrphanFilesClean.java | 5 ++ .../paimon/operation/OrphanFilesClean.java | 29 ++++++++ .../spark/orphan/SparkOrphanFilesClean.scala | 28 ++++++-- .../RemoveOrphanFilesProcedureTest.scala | 72 +++++++++++++++++++ 4 files changed, 128 insertions(+), 6 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index 6a4276662468..8079c713cf29 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -127,6 +127,11 @@ public CleanOrphanFilesResult clean() .collect(Collectors.toList())); candidateDeletes.clear(); + // clean empty directory + Set deletedPaths = + deleteFiles.stream().map(Path::getParent).collect(Collectors.toSet()); + cleanEmptyDirectory(deletedPaths); + return new CleanOrphanFilesResult( deleteFiles.size(), deletedFilesLenInBytes.get(), deleteFiles); } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 54e082091840..950e1b26457e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -390,4 +390,33 @@ public static long olderThanMillis(@Nullable String olderThan) { return parsedTimestampData.getMillisecond(); } } + + public void cleanEmptyDirectory(Set deletedPaths) { + if (deletedPaths.isEmpty()) { + return; + } + + int level = 0; + while (level <= partitionKeysNum) { + Set parentPaths = new HashSet<>(); + for (Path path : deletedPaths) { + boolean deleted = tryDeleteEmptyDirectory(path); + if (deleted) { + LOG.info("Delete empty directory '{}'.", path); + parentPaths.add(path.getParent()); + } + } + deletedPaths = new HashSet<>(parentPaths); + level++; + } + } + + private boolean tryDeleteEmptyDirectory(Path path) { + try { + return fileIO.delete(path, false); + } catch (IOException e) { + LOG.debug("Failed to delete directory '{}' because it is not empty.", path); + return false; + } + } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala index fca0493ede28..0c68797a9ad8 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong import java.util.function.Consumer import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer case class SparkOrphanFilesClean( @@ -49,7 +50,8 @@ case class SparkOrphanFilesClean( with SQLConfHelper with Logging { - def doOrphanClean(): (Dataset[(Long, Long)], Dataset[BranchAndManifestFile]) = { + def doOrphanClean() + : (Dataset[(Long, Long)], (Dataset[BranchAndManifestFile], Dataset[(Long, Long, mutable.HashSet[String])])) = { import spark.implicits._ val branches = validBranches() @@ -137,6 +139,7 @@ case class SparkOrphanFilesClean( it => var deletedFilesCount = 0L var deletedFilesLenInBytes = 0L + val involvedDirectories = new mutable.HashSet[String]() while (it.hasNext) { val fileInfo = it.next(); @@ -145,22 +148,31 @@ case class SparkOrphanFilesClean( deletedFilesLenInBytes += fileInfo.getLong(2) specifiedFileCleaner.accept(deletedPath) logInfo(s"Cleaned file: $pathToClean") + involvedDirectories += deletedPath.getParent.toUri.toString deletedFilesCount += 1 } logInfo( s"Total cleaned files: $deletedFilesCount, Total cleaned files len : $deletedFilesLenInBytes") - Iterator.single((deletedFilesCount, deletedFilesLenInBytes)) + Iterator.single((deletedFilesCount, deletedFilesLenInBytes, involvedDirectories)) } + .cache() + + // clean empty directories + val deletedPaths = + deleted.flatMap { case (_, _, paths) => paths }.collect().map(new Path(_)).toSet + cleanEmptyDirectory(deletedPaths.asJava) + + val deletedResult = deleted.map { case (filesCount, filesLen, _) => (filesCount, filesLen) } val finalDeletedDataset = if (deletedFilesCountInLocal.get() != 0 || deletedFilesLenInBytesInLocal.get() != 0) { - deleted.union( + deletedResult.union( spark.createDataset( Seq((deletedFilesCountInLocal.get(), deletedFilesLenInBytesInLocal.get())))) } else { - deleted + deletedResult } - (finalDeletedDataset, usedManifestFiles) + (finalDeletedDataset, (usedManifestFiles, deleted)) } } @@ -230,7 +242,11 @@ object SparkOrphanFilesClean extends SQLConfHelper { new CleanOrphanFilesResult(result.getLong(0), result.getLong(1)) } } finally { - waitToRelease.foreach(_.unpersist()) + waitToRelease.foreach { + case (usedManifestFiles, deleted) => + usedManifestFiles.unpersist() + deleted.unpersist() + } } } } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala index f45655d5147c..7d6af4a2ebd3 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala @@ -219,6 +219,7 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0, 0) :: Nil) } + test("Paimon procedure: remove orphan files with data file path directory") { sql(s""" |CREATE TABLE T (id STRING, name STRING) @@ -240,4 +241,75 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than')"), Row(1, 1) :: Nil) } + + test("Paimon procedure: clean empty directory after removing orphan files") { + spark.sql(""" + |CREATE TABLE T (k STRING, pt STRING) + |using paimon TBLPROPERTIES ('primary-key'='k,pt', 'bucket'='1', + |'snapshot.clean-empty-directories'='false') PARTITIONED BY (pt); + |""".stripMargin) + + spark.sql(""" + |insert into T values + |("a", "2024-06-02"),("b", "2024-06-02"),("d", "2024-06-03"), + |("c", "2024-06-01"),("Never-expire", "9999-09-09"); + | + |""".stripMargin) + + // by default, no file deleted + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0, 0) :: Nil) + + spark.sql( + "CALL sys.expire_partitions(table => 'T' , " + + "expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', max_expires => 3);") + + // insert a new snapshot to clean expired partitioned files + spark.sql("insert into T values ('Never-expire-2', '9999-09-09')") + + val table = loadTable("T") + val fileIO = table.fileIO() + val tablePath = table.location() + + val partitionValue = "pt=2024-06-01" + val partitionPath = tablePath + "/" + partitionValue + val orphanFile1 = new Path(partitionPath, ORPHAN_FILE_1) + val orphanFile2 = new Path(partitionPath, ORPHAN_FILE_2) + fileIO.writeFile(orphanFile1, "a", true) + Thread.sleep(2000) + fileIO.writeFile(orphanFile2, "b", true) + + checkAnswer( + spark.sql("CALL paimon.sys.expire_snapshots(table => 'T', retain_max => 1)"), + Row(2) :: Nil) + + val older_than1 = new java.sql.Timestamp( + fileIO.getFileStatus(orphanFile2).getModificationTime - TimeUnit.SECONDS.toMillis(1)) + + // partition 'pt=2024-06-01' has one orphan file left + assertResult(true)( + fileIO + .listDirectories(tablePath) + .map(status => status.getPath.getName) + .contains(partitionValue)) + + checkAnswer( + spark.sql( + s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than1', mode => 'distributed')"), + Row(1, 1) :: Nil) + + val older_than2 = new java.sql.Timestamp(System.currentTimeMillis()) + + checkAnswer( + spark.sql( + s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than2', mode => 'local')"), + Row(1, 1) :: Nil) + + // partition 'pt=2024-06-01' has no orphan files, clean empty directory + assertResult(false)( + fileIO + .listDirectories(tablePath) + .map(status => status.getPath.getName) + .contains(partitionValue)) + } + } From caac993e3c641fd5f59d98b2523e09c7c6bde9cb Mon Sep 17 00:00:00 2001 From: Askwang <135721692+Askwang@users.noreply.github.com> Date: Fri, 3 Jan 2025 11:44:36 +0800 Subject: [PATCH 2/9] fix --- .../apache/paimon/spark/orphan/SparkOrphanFilesClean.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala index 0c68797a9ad8..dd34b4b44a56 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala @@ -37,7 +37,6 @@ import java.util.concurrent.atomic.AtomicLong import java.util.function.Consumer import scala.collection.JavaConverters._ -import scala.collection.mutable import scala.collection.mutable.ArrayBuffer case class SparkOrphanFilesClean( @@ -51,7 +50,7 @@ case class SparkOrphanFilesClean( with Logging { def doOrphanClean() - : (Dataset[(Long, Long)], (Dataset[BranchAndManifestFile], Dataset[(Long, Long, mutable.HashSet[String])])) = { + : (Dataset[(Long, Long)], (Dataset[BranchAndManifestFile], Dataset[(Long, Long, ArrayBuffer[String])])) = { import spark.implicits._ val branches = validBranches() @@ -139,7 +138,7 @@ case class SparkOrphanFilesClean( it => var deletedFilesCount = 0L var deletedFilesLenInBytes = 0L - val involvedDirectories = new mutable.HashSet[String]() + val involvedDirectories = new ArrayBuffer[String]() while (it.hasNext) { val fileInfo = it.next(); @@ -148,7 +147,7 @@ case class SparkOrphanFilesClean( deletedFilesLenInBytes += fileInfo.getLong(2) specifiedFileCleaner.accept(deletedPath) logInfo(s"Cleaned file: $pathToClean") - involvedDirectories += deletedPath.getParent.toUri.toString + involvedDirectories.append(deletedPath.getParent.toUri.toString) deletedFilesCount += 1 } logInfo( From 1c92c4051a97042b7a962df2905b17b85fd6bd95 Mon Sep 17 00:00:00 2001 From: Askwang <135721692+Askwang@users.noreply.github.com> Date: Fri, 3 Jan 2025 15:46:28 +0800 Subject: [PATCH 3/9] fix comments --- .../paimon/operation/OrphanFilesClean.java | 30 ++++++++++--------- .../spark/orphan/SparkOrphanFilesClean.scala | 7 +++-- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 950e1b26457e..53d63e394b29 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -56,13 +56,17 @@ import java.util.List; import java.util.Set; import java.util.TimeZone; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX; import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly; +import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; +import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute; /** * To remove the data files and metadata files that are not used by table (so-called "orphan @@ -93,6 +97,10 @@ public abstract class OrphanFilesClean implements Serializable { protected final int partitionKeysNum; protected final Path location; + private static final String THREAD_NAME = "ORPHAN-FILES-CLEAN-THREAD-POOL"; + private static final ThreadPoolExecutor executorService = + createCachedThreadPool(Runtime.getRuntime().availableProcessors(), THREAD_NAME); + public OrphanFilesClean( FileStoreTable table, long olderThanMillis, SerializableConsumer fileCleaner) { this.table = table; @@ -396,27 +404,21 @@ public void cleanEmptyDirectory(Set deletedPaths) { return; } - int level = 0; - while (level <= partitionKeysNum) { - Set parentPaths = new HashSet<>(); - for (Path path : deletedPaths) { - boolean deleted = tryDeleteEmptyDirectory(path); - if (deleted) { - LOG.info("Delete empty directory '{}'.", path); - parentPaths.add(path.getParent()); - } - } + randomlyOnlyExecute(executorService, this::tryDeleteEmptyDirectory, deletedPaths); + + for (int level = 0; level < partitionKeysNum; level++) { + Set parentPaths = + deletedPaths.stream().map(Path::getParent).collect(Collectors.toSet()); + randomlyOnlyExecute(executorService, this::tryDeleteEmptyDirectory, parentPaths); deletedPaths = new HashSet<>(parentPaths); - level++; } } - private boolean tryDeleteEmptyDirectory(Path path) { + private void tryDeleteEmptyDirectory(Path path) { try { - return fileIO.delete(path, false); + fileIO.delete(path, false); } catch (IOException e) { LOG.debug("Failed to delete directory '{}' because it is not empty.", path); - return false; } } } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala index dd34b4b44a56..b2f3c30dd6e9 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala @@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong import java.util.function.Consumer import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer case class SparkOrphanFilesClean( @@ -50,7 +51,7 @@ case class SparkOrphanFilesClean( with Logging { def doOrphanClean() - : (Dataset[(Long, Long)], (Dataset[BranchAndManifestFile], Dataset[(Long, Long, ArrayBuffer[String])])) = { + : (Dataset[(Long, Long)], (Dataset[BranchAndManifestFile], Dataset[(Long, Long, mutable.HashSet[String])])) = { import spark.implicits._ val branches = validBranches() @@ -138,7 +139,7 @@ case class SparkOrphanFilesClean( it => var deletedFilesCount = 0L var deletedFilesLenInBytes = 0L - val involvedDirectories = new ArrayBuffer[String]() + val involvedDirectories = new mutable.HashSet[String]() while (it.hasNext) { val fileInfo = it.next(); @@ -147,7 +148,7 @@ case class SparkOrphanFilesClean( deletedFilesLenInBytes += fileInfo.getLong(2) specifiedFileCleaner.accept(deletedPath) logInfo(s"Cleaned file: $pathToClean") - involvedDirectories.append(deletedPath.getParent.toUri.toString) + involvedDirectories.add(deletedPath.getParent.toUri.toString) deletedFilesCount += 1 } logInfo( From 27afea92ea017083466c5608baa28679269e6958 Mon Sep 17 00:00:00 2001 From: Askwang <135721692+Askwang@users.noreply.github.com> Date: Mon, 6 Jan 2025 16:27:17 +0800 Subject: [PATCH 4/9] 1 --- .../operation/LocalOrphanFilesClean.java | 20 ++++++++++++--- .../paimon/operation/OrphanFilesClean.java | 18 ++++--------- .../spark/orphan/SparkOrphanFilesClean.scala | 25 ++++++++++++++++--- 3 files changed, 43 insertions(+), 20 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index 8079c713cf29..41272e584947 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -52,6 +52,7 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecuteSequentialReturn; @@ -128,14 +129,27 @@ public CleanOrphanFilesResult clean() candidateDeletes.clear(); // clean empty directory - Set deletedPaths = - deleteFiles.stream().map(Path::getParent).collect(Collectors.toSet()); - cleanEmptyDirectory(deletedPaths); + cleanEmptyDirectory(deleteFiles); return new CleanOrphanFilesResult( deleteFiles.size(), deletedFilesLenInBytes.get(), deleteFiles); } + private void cleanEmptyDirectory(List deleteFiles) { + if (deleteFiles.isEmpty()) { + return; + } + Set bucketDirectory = + deleteFiles.stream() + .map(Path::getParent) + .filter(path -> path.toUri().toString().contains(BUCKET_PATH_PREFIX)) + .collect(Collectors.toSet()); + randomlyOnlyExecute(executor, this::tryDeleteEmptyDirectory, bucketDirectory); + + tryCleanPartitionDirectory( + bucketDirectory.stream().map(Path::getParent).collect(Collectors.toSet())); + } + private void collectWithoutDataFile( String branch, Consumer usedFileConsumer, Consumer manifestConsumer) throws IOException { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 53d63e394b29..e0dad4158633 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -66,7 +66,6 @@ import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX; import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly; import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; -import static org.apache.paimon.utils.ThreadPoolUtils.randomlyOnlyExecute; /** * To remove the data files and metadata files that are not used by table (so-called "orphan @@ -399,22 +398,15 @@ public static long olderThanMillis(@Nullable String olderThan) { } } - public void cleanEmptyDirectory(Set deletedPaths) { - if (deletedPaths.isEmpty()) { - return; - } - - randomlyOnlyExecute(executorService, this::tryDeleteEmptyDirectory, deletedPaths); - + /** Try to clean empty partition directories. */ + protected void tryCleanPartitionDirectory(Set deletedPaths) { for (int level = 0; level < partitionKeysNum; level++) { - Set parentPaths = - deletedPaths.stream().map(Path::getParent).collect(Collectors.toSet()); - randomlyOnlyExecute(executorService, this::tryDeleteEmptyDirectory, parentPaths); - deletedPaths = new HashSet<>(parentPaths); + deletedPaths.forEach(this::tryDeleteEmptyDirectory); + deletedPaths = deletedPaths.stream().map(Path::getParent).collect(Collectors.toSet()); } } - private void tryDeleteEmptyDirectory(Path path) { + public void tryDeleteEmptyDirectory(Path path) { try { fileIO.delete(path, false); } catch (IOException e) { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala index b2f3c30dd6e9..f147f05ca7e7 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala @@ -25,6 +25,7 @@ import org.apache.paimon.manifest.{ManifestEntry, ManifestFile} import org.apache.paimon.operation.{CleanOrphanFilesResult, OrphanFilesClean} import org.apache.paimon.operation.OrphanFilesClean.retryReadingFiles import org.apache.paimon.table.FileStoreTable +import org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX import org.apache.paimon.utils.SerializableConsumer import org.apache.spark.internal.Logging @@ -157,10 +158,8 @@ case class SparkOrphanFilesClean( } .cache() - // clean empty directories - val deletedPaths = - deleted.flatMap { case (_, _, paths) => paths }.collect().map(new Path(_)).toSet - cleanEmptyDirectory(deletedPaths.asJava) + // clean empty directory + cleanEmptyDirectory(deleted.flatMap { case (_, _, paths) => paths }) val deletedResult = deleted.map { case (filesCount, filesLen, _) => (filesCount, filesLen) } val finalDeletedDataset = @@ -174,6 +173,24 @@ case class SparkOrphanFilesClean( (finalDeletedDataset, (usedManifestFiles, deleted)) } + + private def cleanEmptyDirectory(deletedPaths: Dataset[String]): Unit = { + import spark.implicits._ + + val partitionDirectory = deletedPaths + .filter(_.contains(BUCKET_PATH_PREFIX)) + .mapPartitions { + iter => + iter.map { + location => + val path = new Path(location) + tryDeleteEmptyDirectory(path) + path.getParent.toUri.toString + } + } + + tryCleanPartitionDirectory(partitionDirectory.collect().map(new Path(_)).toSet.asJava) + } } /** From 6bac2883f158d91018d8fa96707ec5fde1e9cb9a Mon Sep 17 00:00:00 2001 From: Askwang <135721692+Askwang@users.noreply.github.com> Date: Mon, 6 Jan 2025 17:16:45 +0800 Subject: [PATCH 5/9] 1 --- .../apache/paimon/spark/orphan/SparkOrphanFilesClean.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala index f147f05ca7e7..5039b9a2817e 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala @@ -52,7 +52,7 @@ case class SparkOrphanFilesClean( with Logging { def doOrphanClean() - : (Dataset[(Long, Long)], (Dataset[BranchAndManifestFile], Dataset[(Long, Long, mutable.HashSet[String])])) = { + : (Dataset[(Long, Long)], (Dataset[BranchAndManifestFile], Dataset[(Long, Long, Set[String])])) = { import spark.implicits._ val branches = validBranches() @@ -154,7 +154,7 @@ case class SparkOrphanFilesClean( } logInfo( s"Total cleaned files: $deletedFilesCount, Total cleaned files len : $deletedFilesLenInBytes") - Iterator.single((deletedFilesCount, deletedFilesLenInBytes, involvedDirectories)) + Iterator.single((deletedFilesCount, deletedFilesLenInBytes, involvedDirectories.toSet)) } .cache() From 28e50eba6e3974bbdf2450b37a0bc5b56ef533c2 Mon Sep 17 00:00:00 2001 From: Askwang <135721692+Askwang@users.noreply.github.com> Date: Thu, 9 Jan 2025 19:58:20 +0800 Subject: [PATCH 6/9] fix comments --- .../operation/LocalOrphanFilesClean.java | 25 +++++++---- .../paimon/operation/OrphanFilesClean.java | 15 ++++--- .../procedure/RemoveOrphanFilesProcedure.java | 3 +- .../procedure/RemoveOrphanFilesProcedure.java | 3 +- .../procedure/RemoveOrphanFilesProcedure.java | 22 +++++----- .../spark/orphan/SparkOrphanFilesClean.scala | 42 ++++++++++--------- 6 files changed, 66 insertions(+), 44 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index 41272e584947..3f6b72f73bc6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -70,6 +70,8 @@ public class LocalOrphanFilesClean extends OrphanFilesClean { private final List deleteFiles; + private final boolean dryRun; + private final AtomicLong deletedFilesLenInBytes = new AtomicLong(0); private Set candidateDeletes; @@ -79,16 +81,20 @@ public LocalOrphanFilesClean(FileStoreTable table) { } public LocalOrphanFilesClean(FileStoreTable table, long olderThanMillis) { - this(table, olderThanMillis, path -> table.fileIO().deleteQuietly(path)); + this(table, olderThanMillis, path -> table.fileIO().deleteQuietly(path), false); } public LocalOrphanFilesClean( - FileStoreTable table, long olderThanMillis, SerializableConsumer fileCleaner) { + FileStoreTable table, + long olderThanMillis, + SerializableConsumer fileCleaner, + boolean dryRun) { super(table, olderThanMillis, fileCleaner); this.deleteFiles = new ArrayList<>(); this.executor = createCachedThreadPool( table.coreOptions().deleteFileThreadNum(), "ORPHAN_FILES_CLEAN"); + this.dryRun = dryRun; } public CleanOrphanFilesResult clean() @@ -129,7 +135,9 @@ public CleanOrphanFilesResult clean() candidateDeletes.clear(); // clean empty directory - cleanEmptyDirectory(deleteFiles); + if (!dryRun) { + cleanEmptyDirectory(deleteFiles); + } return new CleanOrphanFilesResult( deleteFiles.size(), deletedFilesLenInBytes.get(), deleteFiles); @@ -230,7 +238,8 @@ public static List createOrphanFilesCleans( @Nullable String tableName, long olderThanMillis, SerializableConsumer fileCleaner, - @Nullable Integer parallelism) + @Nullable Integer parallelism, + boolean dryRun) throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { List tableNames = Collections.singletonList(tableName); if (tableName == null || "*".equals(tableName)) { @@ -259,7 +268,7 @@ public static List createOrphanFilesCleans( orphanFilesCleans.add( new LocalOrphanFilesClean( - (FileStoreTable) table, olderThanMillis, fileCleaner)); + (FileStoreTable) table, olderThanMillis, fileCleaner, dryRun)); } return orphanFilesCleans; @@ -271,7 +280,8 @@ public static CleanOrphanFilesResult executeDatabaseOrphanFiles( @Nullable String tableName, long olderThanMillis, SerializableConsumer fileCleaner, - @Nullable Integer parallelism) + @Nullable Integer parallelism, + boolean dryRun) throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { List tableCleans = createOrphanFilesCleans( @@ -280,7 +290,8 @@ public static CleanOrphanFilesResult executeDatabaseOrphanFiles( tableName, olderThanMillis, fileCleaner, - parallelism); + parallelism, + dryRun); ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index e0dad4158633..db4c0071c2b8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -399,18 +399,21 @@ public static long olderThanMillis(@Nullable String olderThan) { } /** Try to clean empty partition directories. */ - protected void tryCleanPartitionDirectory(Set deletedPaths) { + protected void tryCleanPartitionDirectory(Set partitionDirs) { for (int level = 0; level < partitionKeysNum; level++) { - deletedPaths.forEach(this::tryDeleteEmptyDirectory); - deletedPaths = deletedPaths.stream().map(Path::getParent).collect(Collectors.toSet()); + partitionDirs = + partitionDirs.stream() + .filter(this::tryDeleteEmptyDirectory) + .map(Path::getParent) + .collect(Collectors.toSet()); } } - public void tryDeleteEmptyDirectory(Path path) { + public boolean tryDeleteEmptyDirectory(Path path) { try { - fileIO.delete(path, false); + return fileIO.delete(path, false); } catch (IOException e) { - LOG.debug("Failed to delete directory '{}' because it is not empty.", path); + return false; } } } diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java index b4a3a6b359d9..4f8217ffce40 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java @@ -110,7 +110,8 @@ public String[] call( tableName, olderThanMillis(olderThan), createFileCleaner(catalog, dryRun), - parallelism); + parallelism, + dryRun); break; default: throw new IllegalArgumentException( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java index 4cd1b3e00303..8634e1e5e3f7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java @@ -98,7 +98,8 @@ public String[] call( tableName, olderThanMillis(olderThan), createFileCleaner(catalog, dryRun), - parallelism); + parallelism, + dryRun); break; default: throw new IllegalArgumentException( diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java index a929641106c6..dd5826420036 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java @@ -90,6 +90,10 @@ public StructType outputType() { public InternalRow[] call(InternalRow args) { org.apache.paimon.catalog.Identifier identifier; String tableId = args.getString(0); + String olderThan = args.isNullAt(1) ? null : args.getString(1); + boolean dryRun = !args.isNullAt(2) && args.getBoolean(2); + Integer parallelism = args.isNullAt(3) ? null : args.getInt(3); + Preconditions.checkArgument( tableId != null && !tableId.isEmpty(), "Cannot handle an empty tableId for argument %s", @@ -116,11 +120,10 @@ public InternalRow[] call(InternalRow args) { catalog, identifier.getDatabaseName(), identifier.getTableName(), - OrphanFilesClean.olderThanMillis( - args.isNullAt(1) ? null : args.getString(1)), - OrphanFilesClean.createFileCleaner( - catalog, !args.isNullAt(2) && args.getBoolean(2)), - args.isNullAt(3) ? null : args.getInt(3)); + OrphanFilesClean.olderThanMillis(olderThan), + OrphanFilesClean.createFileCleaner(catalog, dryRun), + parallelism, + dryRun); break; case "DISTRIBUTED": cleanOrphanFilesResult = @@ -128,11 +131,10 @@ public InternalRow[] call(InternalRow args) { catalog, identifier.getDatabaseName(), identifier.getTableName(), - OrphanFilesClean.olderThanMillis( - args.isNullAt(1) ? null : args.getString(1)), - OrphanFilesClean.createFileCleaner( - catalog, !args.isNullAt(2) && args.getBoolean(2)), - args.isNullAt(3) ? null : args.getInt(3)); + OrphanFilesClean.olderThanMillis(olderThan), + OrphanFilesClean.createFileCleaner(catalog, dryRun), + parallelism, + dryRun); break; default: throw new IllegalArgumentException( diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala index 5039b9a2817e..d25b0fffba55 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala @@ -46,13 +46,13 @@ case class SparkOrphanFilesClean( specifiedOlderThanMillis: Long, specifiedFileCleaner: SerializableConsumer[Path], parallelism: Int, + dryRun: Boolean, @transient spark: SparkSession) extends OrphanFilesClean(specifiedTable, specifiedOlderThanMillis, specifiedFileCleaner) with SQLConfHelper with Logging { - def doOrphanClean() - : (Dataset[(Long, Long)], (Dataset[BranchAndManifestFile], Dataset[(Long, Long, Set[String])])) = { + def doOrphanClean(): (Dataset[(Long, Long)], Dataset[BranchAndManifestFile]) = { import spark.implicits._ val branches = validBranches() @@ -140,7 +140,7 @@ case class SparkOrphanFilesClean( it => var deletedFilesCount = 0L var deletedFilesLenInBytes = 0L - val involvedDirectories = new mutable.HashSet[String]() + val bucketDirs = new mutable.HashSet[String]() while (it.hasNext) { val fileInfo = it.next(); @@ -149,29 +149,35 @@ case class SparkOrphanFilesClean( deletedFilesLenInBytes += fileInfo.getLong(2) specifiedFileCleaner.accept(deletedPath) logInfo(s"Cleaned file: $pathToClean") - involvedDirectories.add(deletedPath.getParent.toUri.toString) + bucketDirs.add(deletedPath.getParent.toUri.toString) deletedFilesCount += 1 } + + // clean empty directory + if (!dryRun) { + val partitionDirs = bucketDirs + .filter(_.contains(BUCKET_PATH_PREFIX)) + .map(new Path(_)) + .filter(tryDeleteEmptyDirectory) + .map(_.getParent) + tryCleanPartitionDirectory(partitionDirs.asJava) + } + logInfo( s"Total cleaned files: $deletedFilesCount, Total cleaned files len : $deletedFilesLenInBytes") - Iterator.single((deletedFilesCount, deletedFilesLenInBytes, involvedDirectories.toSet)) + Iterator.single((deletedFilesCount, deletedFilesLenInBytes)) } - .cache() - - // clean empty directory - cleanEmptyDirectory(deleted.flatMap { case (_, _, paths) => paths }) - val deletedResult = deleted.map { case (filesCount, filesLen, _) => (filesCount, filesLen) } val finalDeletedDataset = if (deletedFilesCountInLocal.get() != 0 || deletedFilesLenInBytesInLocal.get() != 0) { - deletedResult.union( + deleted.union( spark.createDataset( Seq((deletedFilesCountInLocal.get(), deletedFilesLenInBytesInLocal.get())))) } else { - deletedResult + deleted } - (finalDeletedDataset, (usedManifestFiles, deleted)) + (finalDeletedDataset, usedManifestFiles) } private def cleanEmptyDirectory(deletedPaths: Dataset[String]): Unit = { @@ -210,7 +216,8 @@ object SparkOrphanFilesClean extends SQLConfHelper { tableName: String, olderThanMillis: Long, fileCleaner: SerializableConsumer[Path], - parallelismOpt: Integer): CleanOrphanFilesResult = { + parallelismOpt: Integer, + dryRun: Boolean): CleanOrphanFilesResult = { val spark = SparkSession.active val parallelism = if (parallelismOpt == null) { Math.max(spark.sparkContext.defaultParallelism, conf.numShufflePartitions) @@ -242,6 +249,7 @@ object SparkOrphanFilesClean extends SQLConfHelper { olderThanMillis, fileCleaner, parallelism, + dryRun, spark ).doOrphanClean() }.unzip @@ -259,11 +267,7 @@ object SparkOrphanFilesClean extends SQLConfHelper { new CleanOrphanFilesResult(result.getLong(0), result.getLong(1)) } } finally { - waitToRelease.foreach { - case (usedManifestFiles, deleted) => - usedManifestFiles.unpersist() - deleted.unpersist() - } + waitToRelease.foreach(_.unpersist()) } } } From 3bcbd7d5817d41b564466c7d21d04faca535aeb0 Mon Sep 17 00:00:00 2001 From: Askwang <135721692+Askwang@users.noreply.github.com> Date: Thu, 9 Jan 2025 20:31:06 +0800 Subject: [PATCH 7/9] fix --- .../paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala index 7d6af4a2ebd3..248ba863cb62 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala @@ -219,7 +219,6 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0, 0) :: Nil) } - test("Paimon procedure: remove orphan files with data file path directory") { sql(s""" |CREATE TABLE T (id STRING, name STRING) From 44b9cf528dcd2928635aae694853f4f08504f64f Mon Sep 17 00:00:00 2001 From: Askwang <135721692+Askwang@users.noreply.github.com> Date: Fri, 10 Jan 2025 16:22:54 +0800 Subject: [PATCH 8/9] fix comments --- .../operation/LocalOrphanFilesClean.java | 14 ++++---- .../paimon/operation/OrphanFilesClean.java | 16 +++------ .../spark/orphan/SparkOrphanFilesClean.scala | 35 +++++-------------- 3 files changed, 22 insertions(+), 43 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index 3f6b72f73bc6..551f26d17419 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -136,26 +136,28 @@ public CleanOrphanFilesResult clean() // clean empty directory if (!dryRun) { - cleanEmptyDirectory(deleteFiles); + cleanEmptyDataDirectory(deleteFiles); } return new CleanOrphanFilesResult( deleteFiles.size(), deletedFilesLenInBytes.get(), deleteFiles); } - private void cleanEmptyDirectory(List deleteFiles) { + private void cleanEmptyDataDirectory(List deleteFiles) { if (deleteFiles.isEmpty()) { return; } - Set bucketDirectory = + Set bucketDirs = deleteFiles.stream() .map(Path::getParent) .filter(path -> path.toUri().toString().contains(BUCKET_PATH_PREFIX)) .collect(Collectors.toSet()); - randomlyOnlyExecute(executor, this::tryDeleteEmptyDirectory, bucketDirectory); + randomlyOnlyExecute(executor, this::tryDeleteEmptyDirectory, bucketDirs); - tryCleanPartitionDirectory( - bucketDirectory.stream().map(Path::getParent).collect(Collectors.toSet())); + // Clean partition directory individually to avoiding conflicts + Set partitionDirs = + bucketDirs.stream().map(Path::getParent).collect(Collectors.toSet()); + tryCleanDataDirectory(partitionDirs, partitionKeysNum); } private void collectWithoutDataFile( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index db4c0071c2b8..8a11e9fc22c0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -56,7 +56,6 @@ import java.util.List; import java.util.Set; import java.util.TimeZone; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Predicate; @@ -65,7 +64,6 @@ import static java.util.Collections.emptyList; import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX; import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly; -import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; /** * To remove the data files and metadata files that are not used by table (so-called "orphan @@ -96,10 +94,6 @@ public abstract class OrphanFilesClean implements Serializable { protected final int partitionKeysNum; protected final Path location; - private static final String THREAD_NAME = "ORPHAN-FILES-CLEAN-THREAD-POOL"; - private static final ThreadPoolExecutor executorService = - createCachedThreadPool(Runtime.getRuntime().availableProcessors(), THREAD_NAME); - public OrphanFilesClean( FileStoreTable table, long olderThanMillis, SerializableConsumer fileCleaner) { this.table = table; @@ -398,11 +392,11 @@ public static long olderThanMillis(@Nullable String olderThan) { } } - /** Try to clean empty partition directories. */ - protected void tryCleanPartitionDirectory(Set partitionDirs) { - for (int level = 0; level < partitionKeysNum; level++) { - partitionDirs = - partitionDirs.stream() + /** Try to clean empty data directories. */ + protected void tryCleanDataDirectory(Set dataDirs, int maxLevel) { + for (int level = 0; level < maxLevel; level++) { + dataDirs = + dataDirs.stream() .filter(this::tryDeleteEmptyDirectory) .map(Path::getParent) .collect(Collectors.toSet()); diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala index d25b0fffba55..4798368f56a8 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala @@ -127,20 +127,23 @@ case class SparkOrphanFilesClean( .flatMap { dir => tryBestListingDirs(new Path(dir)).asScala.filter(oldEnough).map { - file => (file.getPath.getName, file.getPath.toUri.toString, file.getLen) + file => + val path = file.getPath + (path.getName, path.toUri.toString, file.getLen, path.getParent.toUri.toString) } } - .toDF("name", "path", "len") + .toDF("name", "path", "len", "dataDir") .repartition(parallelism) // use left anti to filter files which is not used val deleted = candidates .join(usedFiles, $"name" === $"used_name", "left_anti") + .repartition($"dataDir") .mapPartitions { it => var deletedFilesCount = 0L var deletedFilesLenInBytes = 0L - val bucketDirs = new mutable.HashSet[String]() + val dataDirs = new mutable.HashSet[String]() while (it.hasNext) { val fileInfo = it.next(); @@ -149,18 +152,16 @@ case class SparkOrphanFilesClean( deletedFilesLenInBytes += fileInfo.getLong(2) specifiedFileCleaner.accept(deletedPath) logInfo(s"Cleaned file: $pathToClean") - bucketDirs.add(deletedPath.getParent.toUri.toString) + dataDirs.add(deletedPath.getParent.toUri.toString) deletedFilesCount += 1 } // clean empty directory if (!dryRun) { - val partitionDirs = bucketDirs + val bucketDirs = dataDirs .filter(_.contains(BUCKET_PATH_PREFIX)) .map(new Path(_)) - .filter(tryDeleteEmptyDirectory) - .map(_.getParent) - tryCleanPartitionDirectory(partitionDirs.asJava) + tryCleanDataDirectory(bucketDirs.asJava, partitionKeysNum + 1) } logInfo( @@ -179,24 +180,6 @@ case class SparkOrphanFilesClean( (finalDeletedDataset, usedManifestFiles) } - - private def cleanEmptyDirectory(deletedPaths: Dataset[String]): Unit = { - import spark.implicits._ - - val partitionDirectory = deletedPaths - .filter(_.contains(BUCKET_PATH_PREFIX)) - .mapPartitions { - iter => - iter.map { - location => - val path = new Path(location) - tryDeleteEmptyDirectory(path) - path.getParent.toUri.toString - } - } - - tryCleanPartitionDirectory(partitionDirectory.collect().map(new Path(_)).toSet.asJava) - } } /** From 5ff7804fdaecb010c22d751314740554f4b2d442 Mon Sep 17 00:00:00 2001 From: Askwang <135721692+Askwang@users.noreply.github.com> Date: Fri, 10 Jan 2025 17:57:55 +0800 Subject: [PATCH 9/9] 1 --- .../org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala index 4798368f56a8..16b896937961 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala @@ -152,7 +152,7 @@ case class SparkOrphanFilesClean( deletedFilesLenInBytes += fileInfo.getLong(2) specifiedFileCleaner.accept(deletedPath) logInfo(s"Cleaned file: $pathToClean") - dataDirs.add(deletedPath.getParent.toUri.toString) + dataDirs.add(fileInfo.getString(3)) deletedFilesCount += 1 }