diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index 6a4276662468..551f26d17419 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -52,6 +52,7 @@ import java.util.function.Function; import java.util.stream.Collectors; +import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX; import static org.apache.paimon.utils.Preconditions.checkArgument; import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecuteSequentialReturn; @@ -69,6 +70,8 @@ public class LocalOrphanFilesClean extends OrphanFilesClean { private final List deleteFiles; + private final boolean dryRun; + private final AtomicLong deletedFilesLenInBytes = new AtomicLong(0); private Set candidateDeletes; @@ -78,16 +81,20 @@ public LocalOrphanFilesClean(FileStoreTable table) { } public LocalOrphanFilesClean(FileStoreTable table, long olderThanMillis) { - this(table, olderThanMillis, path -> table.fileIO().deleteQuietly(path)); + this(table, olderThanMillis, path -> table.fileIO().deleteQuietly(path), false); } public LocalOrphanFilesClean( - FileStoreTable table, long olderThanMillis, SerializableConsumer fileCleaner) { + FileStoreTable table, + long olderThanMillis, + SerializableConsumer fileCleaner, + boolean dryRun) { super(table, olderThanMillis, fileCleaner); this.deleteFiles = new ArrayList<>(); this.executor = createCachedThreadPool( table.coreOptions().deleteFileThreadNum(), "ORPHAN_FILES_CLEAN"); + this.dryRun = dryRun; } public CleanOrphanFilesResult clean() @@ -127,10 +134,32 @@ public CleanOrphanFilesResult clean() .collect(Collectors.toList())); candidateDeletes.clear(); + // clean empty directory + if (!dryRun) { + cleanEmptyDataDirectory(deleteFiles); + } + return new CleanOrphanFilesResult( deleteFiles.size(), deletedFilesLenInBytes.get(), deleteFiles); } + private void cleanEmptyDataDirectory(List deleteFiles) { + if (deleteFiles.isEmpty()) { + return; + } + Set bucketDirs = + deleteFiles.stream() + .map(Path::getParent) + .filter(path -> path.toUri().toString().contains(BUCKET_PATH_PREFIX)) + .collect(Collectors.toSet()); + randomlyOnlyExecute(executor, this::tryDeleteEmptyDirectory, bucketDirs); + + // Clean partition directory individually to avoiding conflicts + Set partitionDirs = + bucketDirs.stream().map(Path::getParent).collect(Collectors.toSet()); + tryCleanDataDirectory(partitionDirs, partitionKeysNum); + } + private void collectWithoutDataFile( String branch, Consumer usedFileConsumer, Consumer manifestConsumer) throws IOException { @@ -211,7 +240,8 @@ public static List createOrphanFilesCleans( @Nullable String tableName, long olderThanMillis, SerializableConsumer fileCleaner, - @Nullable Integer parallelism) + @Nullable Integer parallelism, + boolean dryRun) throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { List tableNames = Collections.singletonList(tableName); if (tableName == null || "*".equals(tableName)) { @@ -240,7 +270,7 @@ public static List createOrphanFilesCleans( orphanFilesCleans.add( new LocalOrphanFilesClean( - (FileStoreTable) table, olderThanMillis, fileCleaner)); + (FileStoreTable) table, olderThanMillis, fileCleaner, dryRun)); } return orphanFilesCleans; @@ -252,7 +282,8 @@ public static CleanOrphanFilesResult executeDatabaseOrphanFiles( @Nullable String tableName, long olderThanMillis, SerializableConsumer fileCleaner, - @Nullable Integer parallelism) + @Nullable Integer parallelism, + boolean dryRun) throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { List tableCleans = createOrphanFilesCleans( @@ -261,7 +292,8 @@ public static CleanOrphanFilesResult executeDatabaseOrphanFiles( tableName, olderThanMillis, fileCleaner, - parallelism); + parallelism, + dryRun); ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 54e082091840..8a11e9fc22c0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -59,6 +59,7 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Predicate; +import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX; @@ -390,4 +391,23 @@ public static long olderThanMillis(@Nullable String olderThan) { return parsedTimestampData.getMillisecond(); } } + + /** Try to clean empty data directories. */ + protected void tryCleanDataDirectory(Set dataDirs, int maxLevel) { + for (int level = 0; level < maxLevel; level++) { + dataDirs = + dataDirs.stream() + .filter(this::tryDeleteEmptyDirectory) + .map(Path::getParent) + .collect(Collectors.toSet()); + } + } + + public boolean tryDeleteEmptyDirectory(Path path) { + try { + return fileIO.delete(path, false); + } catch (IOException e) { + return false; + } + } } diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java index b4a3a6b359d9..4f8217ffce40 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java @@ -110,7 +110,8 @@ public String[] call( tableName, olderThanMillis(olderThan), createFileCleaner(catalog, dryRun), - parallelism); + parallelism, + dryRun); break; default: throw new IllegalArgumentException( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java index 4cd1b3e00303..8634e1e5e3f7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveOrphanFilesProcedure.java @@ -98,7 +98,8 @@ public String[] call( tableName, olderThanMillis(olderThan), createFileCleaner(catalog, dryRun), - parallelism); + parallelism, + dryRun); break; default: throw new IllegalArgumentException( diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java index a929641106c6..dd5826420036 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java @@ -90,6 +90,10 @@ public StructType outputType() { public InternalRow[] call(InternalRow args) { org.apache.paimon.catalog.Identifier identifier; String tableId = args.getString(0); + String olderThan = args.isNullAt(1) ? null : args.getString(1); + boolean dryRun = !args.isNullAt(2) && args.getBoolean(2); + Integer parallelism = args.isNullAt(3) ? null : args.getInt(3); + Preconditions.checkArgument( tableId != null && !tableId.isEmpty(), "Cannot handle an empty tableId for argument %s", @@ -116,11 +120,10 @@ public InternalRow[] call(InternalRow args) { catalog, identifier.getDatabaseName(), identifier.getTableName(), - OrphanFilesClean.olderThanMillis( - args.isNullAt(1) ? null : args.getString(1)), - OrphanFilesClean.createFileCleaner( - catalog, !args.isNullAt(2) && args.getBoolean(2)), - args.isNullAt(3) ? null : args.getInt(3)); + OrphanFilesClean.olderThanMillis(olderThan), + OrphanFilesClean.createFileCleaner(catalog, dryRun), + parallelism, + dryRun); break; case "DISTRIBUTED": cleanOrphanFilesResult = @@ -128,11 +131,10 @@ public InternalRow[] call(InternalRow args) { catalog, identifier.getDatabaseName(), identifier.getTableName(), - OrphanFilesClean.olderThanMillis( - args.isNullAt(1) ? null : args.getString(1)), - OrphanFilesClean.createFileCleaner( - catalog, !args.isNullAt(2) && args.getBoolean(2)), - args.isNullAt(3) ? null : args.getInt(3)); + OrphanFilesClean.olderThanMillis(olderThan), + OrphanFilesClean.createFileCleaner(catalog, dryRun), + parallelism, + dryRun); break; default: throw new IllegalArgumentException( diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala index fca0493ede28..16b896937961 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala @@ -25,6 +25,7 @@ import org.apache.paimon.manifest.{ManifestEntry, ManifestFile} import org.apache.paimon.operation.{CleanOrphanFilesResult, OrphanFilesClean} import org.apache.paimon.operation.OrphanFilesClean.retryReadingFiles import org.apache.paimon.table.FileStoreTable +import org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX import org.apache.paimon.utils.SerializableConsumer import org.apache.spark.internal.Logging @@ -37,6 +38,7 @@ import java.util.concurrent.atomic.AtomicLong import java.util.function.Consumer import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer case class SparkOrphanFilesClean( @@ -44,6 +46,7 @@ case class SparkOrphanFilesClean( specifiedOlderThanMillis: Long, specifiedFileCleaner: SerializableConsumer[Path], parallelism: Int, + dryRun: Boolean, @transient spark: SparkSession) extends OrphanFilesClean(specifiedTable, specifiedOlderThanMillis, specifiedFileCleaner) with SQLConfHelper @@ -124,19 +127,23 @@ case class SparkOrphanFilesClean( .flatMap { dir => tryBestListingDirs(new Path(dir)).asScala.filter(oldEnough).map { - file => (file.getPath.getName, file.getPath.toUri.toString, file.getLen) + file => + val path = file.getPath + (path.getName, path.toUri.toString, file.getLen, path.getParent.toUri.toString) } } - .toDF("name", "path", "len") + .toDF("name", "path", "len", "dataDir") .repartition(parallelism) // use left anti to filter files which is not used val deleted = candidates .join(usedFiles, $"name" === $"used_name", "left_anti") + .repartition($"dataDir") .mapPartitions { it => var deletedFilesCount = 0L var deletedFilesLenInBytes = 0L + val dataDirs = new mutable.HashSet[String]() while (it.hasNext) { val fileInfo = it.next(); @@ -145,12 +152,23 @@ case class SparkOrphanFilesClean( deletedFilesLenInBytes += fileInfo.getLong(2) specifiedFileCleaner.accept(deletedPath) logInfo(s"Cleaned file: $pathToClean") + dataDirs.add(fileInfo.getString(3)) deletedFilesCount += 1 } + + // clean empty directory + if (!dryRun) { + val bucketDirs = dataDirs + .filter(_.contains(BUCKET_PATH_PREFIX)) + .map(new Path(_)) + tryCleanDataDirectory(bucketDirs.asJava, partitionKeysNum + 1) + } + logInfo( s"Total cleaned files: $deletedFilesCount, Total cleaned files len : $deletedFilesLenInBytes") Iterator.single((deletedFilesCount, deletedFilesLenInBytes)) } + val finalDeletedDataset = if (deletedFilesCountInLocal.get() != 0 || deletedFilesLenInBytesInLocal.get() != 0) { deleted.union( @@ -181,7 +199,8 @@ object SparkOrphanFilesClean extends SQLConfHelper { tableName: String, olderThanMillis: Long, fileCleaner: SerializableConsumer[Path], - parallelismOpt: Integer): CleanOrphanFilesResult = { + parallelismOpt: Integer, + dryRun: Boolean): CleanOrphanFilesResult = { val spark = SparkSession.active val parallelism = if (parallelismOpt == null) { Math.max(spark.sparkContext.defaultParallelism, conf.numShufflePartitions) @@ -213,6 +232,7 @@ object SparkOrphanFilesClean extends SQLConfHelper { olderThanMillis, fileCleaner, parallelism, + dryRun, spark ).doOrphanClean() }.unzip diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala index f45655d5147c..248ba863cb62 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedureTest.scala @@ -240,4 +240,75 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase { sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than')"), Row(1, 1) :: Nil) } + + test("Paimon procedure: clean empty directory after removing orphan files") { + spark.sql(""" + |CREATE TABLE T (k STRING, pt STRING) + |using paimon TBLPROPERTIES ('primary-key'='k,pt', 'bucket'='1', + |'snapshot.clean-empty-directories'='false') PARTITIONED BY (pt); + |""".stripMargin) + + spark.sql(""" + |insert into T values + |("a", "2024-06-02"),("b", "2024-06-02"),("d", "2024-06-03"), + |("c", "2024-06-01"),("Never-expire", "9999-09-09"); + | + |""".stripMargin) + + // by default, no file deleted + checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0, 0) :: Nil) + + spark.sql( + "CALL sys.expire_partitions(table => 'T' , " + + "expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', max_expires => 3);") + + // insert a new snapshot to clean expired partitioned files + spark.sql("insert into T values ('Never-expire-2', '9999-09-09')") + + val table = loadTable("T") + val fileIO = table.fileIO() + val tablePath = table.location() + + val partitionValue = "pt=2024-06-01" + val partitionPath = tablePath + "/" + partitionValue + val orphanFile1 = new Path(partitionPath, ORPHAN_FILE_1) + val orphanFile2 = new Path(partitionPath, ORPHAN_FILE_2) + fileIO.writeFile(orphanFile1, "a", true) + Thread.sleep(2000) + fileIO.writeFile(orphanFile2, "b", true) + + checkAnswer( + spark.sql("CALL paimon.sys.expire_snapshots(table => 'T', retain_max => 1)"), + Row(2) :: Nil) + + val older_than1 = new java.sql.Timestamp( + fileIO.getFileStatus(orphanFile2).getModificationTime - TimeUnit.SECONDS.toMillis(1)) + + // partition 'pt=2024-06-01' has one orphan file left + assertResult(true)( + fileIO + .listDirectories(tablePath) + .map(status => status.getPath.getName) + .contains(partitionValue)) + + checkAnswer( + spark.sql( + s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than1', mode => 'distributed')"), + Row(1, 1) :: Nil) + + val older_than2 = new java.sql.Timestamp(System.currentTimeMillis()) + + checkAnswer( + spark.sql( + s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than2', mode => 'local')"), + Row(1, 1) :: Nil) + + // partition 'pt=2024-06-01' has no orphan files, clean empty directory + assertResult(false)( + fileIO + .listDirectories(tablePath) + .map(status => status.getPath.getName) + .contains(partitionValue)) + } + }