Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
import static org.apache.paimon.utils.ThreadPoolUtils.randomlyExecuteSequentialReturn;
Expand All @@ -69,6 +70,8 @@ public class LocalOrphanFilesClean extends OrphanFilesClean {

private final List<Path> deleteFiles;

private final boolean dryRun;

private final AtomicLong deletedFilesLenInBytes = new AtomicLong(0);

private Set<String> candidateDeletes;
Expand All @@ -78,16 +81,20 @@ public LocalOrphanFilesClean(FileStoreTable table) {
}

public LocalOrphanFilesClean(FileStoreTable table, long olderThanMillis) {
this(table, olderThanMillis, path -> table.fileIO().deleteQuietly(path));
this(table, olderThanMillis, path -> table.fileIO().deleteQuietly(path), false);
}

public LocalOrphanFilesClean(
FileStoreTable table, long olderThanMillis, SerializableConsumer<Path> fileCleaner) {
FileStoreTable table,
long olderThanMillis,
SerializableConsumer<Path> fileCleaner,
boolean dryRun) {
super(table, olderThanMillis, fileCleaner);
this.deleteFiles = new ArrayList<>();
this.executor =
createCachedThreadPool(
table.coreOptions().deleteFileThreadNum(), "ORPHAN_FILES_CLEAN");
this.dryRun = dryRun;
}

public CleanOrphanFilesResult clean()
Expand Down Expand Up @@ -127,10 +134,32 @@ public CleanOrphanFilesResult clean()
.collect(Collectors.toList()));
candidateDeletes.clear();

// clean empty directory
if (!dryRun) {
cleanEmptyDataDirectory(deleteFiles);
}

return new CleanOrphanFilesResult(
deleteFiles.size(), deletedFilesLenInBytes.get(), deleteFiles);
}

private void cleanEmptyDataDirectory(List<Path> deleteFiles) {
if (deleteFiles.isEmpty()) {
return;
}
Set<Path> bucketDirs =
deleteFiles.stream()
.map(Path::getParent)
.filter(path -> path.toUri().toString().contains(BUCKET_PATH_PREFIX))
.collect(Collectors.toSet());
randomlyOnlyExecute(executor, this::tryDeleteEmptyDirectory, bucketDirs);

// Clean partition directory individually to avoiding conflicts
Set<Path> partitionDirs =
bucketDirs.stream().map(Path::getParent).collect(Collectors.toSet());
tryCleanDataDirectory(partitionDirs, partitionKeysNum);
}

private void collectWithoutDataFile(
String branch, Consumer<String> usedFileConsumer, Consumer<String> manifestConsumer)
throws IOException {
Expand Down Expand Up @@ -211,7 +240,8 @@ public static List<LocalOrphanFilesClean> createOrphanFilesCleans(
@Nullable String tableName,
long olderThanMillis,
SerializableConsumer<Path> fileCleaner,
@Nullable Integer parallelism)
@Nullable Integer parallelism,
boolean dryRun)
throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException {
List<String> tableNames = Collections.singletonList(tableName);
if (tableName == null || "*".equals(tableName)) {
Expand Down Expand Up @@ -240,7 +270,7 @@ public static List<LocalOrphanFilesClean> createOrphanFilesCleans(

orphanFilesCleans.add(
new LocalOrphanFilesClean(
(FileStoreTable) table, olderThanMillis, fileCleaner));
(FileStoreTable) table, olderThanMillis, fileCleaner, dryRun));
}

return orphanFilesCleans;
Expand All @@ -252,7 +282,8 @@ public static CleanOrphanFilesResult executeDatabaseOrphanFiles(
@Nullable String tableName,
long olderThanMillis,
SerializableConsumer<Path> fileCleaner,
@Nullable Integer parallelism)
@Nullable Integer parallelism,
boolean dryRun)
throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException {
List<LocalOrphanFilesClean> tableCleans =
createOrphanFilesCleans(
Expand All @@ -261,7 +292,8 @@ public static CleanOrphanFilesResult executeDatabaseOrphanFiles(
tableName,
olderThanMillis,
fileCleaner,
parallelism);
parallelism,
dryRun);

ExecutorService executorService =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
import static org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX;
Expand Down Expand Up @@ -390,4 +391,23 @@ public static long olderThanMillis(@Nullable String olderThan) {
return parsedTimestampData.getMillisecond();
}
}

/** Try to clean empty data directories. */
protected void tryCleanDataDirectory(Set<Path> dataDirs, int maxLevel) {
for (int level = 0; level < maxLevel; level++) {
dataDirs =
dataDirs.stream()
.filter(this::tryDeleteEmptyDirectory)
.map(Path::getParent)
.collect(Collectors.toSet());
}
}

public boolean tryDeleteEmptyDirectory(Path path) {
try {
return fileIO.delete(path, false);
} catch (IOException e) {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ public String[] call(
tableName,
olderThanMillis(olderThan),
createFileCleaner(catalog, dryRun),
parallelism);
parallelism,
dryRun);
break;
default:
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ public String[] call(
tableName,
olderThanMillis(olderThan),
createFileCleaner(catalog, dryRun),
parallelism);
parallelism,
dryRun);
break;
default:
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public StructType outputType() {
public InternalRow[] call(InternalRow args) {
org.apache.paimon.catalog.Identifier identifier;
String tableId = args.getString(0);
String olderThan = args.isNullAt(1) ? null : args.getString(1);
boolean dryRun = !args.isNullAt(2) && args.getBoolean(2);
Integer parallelism = args.isNullAt(3) ? null : args.getInt(3);

Preconditions.checkArgument(
tableId != null && !tableId.isEmpty(),
"Cannot handle an empty tableId for argument %s",
Expand All @@ -116,23 +120,21 @@ public InternalRow[] call(InternalRow args) {
catalog,
identifier.getDatabaseName(),
identifier.getTableName(),
OrphanFilesClean.olderThanMillis(
args.isNullAt(1) ? null : args.getString(1)),
OrphanFilesClean.createFileCleaner(
catalog, !args.isNullAt(2) && args.getBoolean(2)),
args.isNullAt(3) ? null : args.getInt(3));
OrphanFilesClean.olderThanMillis(olderThan),
OrphanFilesClean.createFileCleaner(catalog, dryRun),
parallelism,
dryRun);
break;
case "DISTRIBUTED":
cleanOrphanFilesResult =
SparkOrphanFilesClean.executeDatabaseOrphanFiles(
catalog,
identifier.getDatabaseName(),
identifier.getTableName(),
OrphanFilesClean.olderThanMillis(
args.isNullAt(1) ? null : args.getString(1)),
OrphanFilesClean.createFileCleaner(
catalog, !args.isNullAt(2) && args.getBoolean(2)),
args.isNullAt(3) ? null : args.getInt(3));
OrphanFilesClean.olderThanMillis(olderThan),
OrphanFilesClean.createFileCleaner(catalog, dryRun),
parallelism,
dryRun);
break;
default:
throw new IllegalArgumentException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.paimon.manifest.{ManifestEntry, ManifestFile}
import org.apache.paimon.operation.{CleanOrphanFilesResult, OrphanFilesClean}
import org.apache.paimon.operation.OrphanFilesClean.retryReadingFiles
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.utils.FileStorePathFactory.BUCKET_PATH_PREFIX
import org.apache.paimon.utils.SerializableConsumer

import org.apache.spark.internal.Logging
Expand All @@ -37,13 +38,15 @@ import java.util.concurrent.atomic.AtomicLong
import java.util.function.Consumer

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

case class SparkOrphanFilesClean(
specifiedTable: FileStoreTable,
specifiedOlderThanMillis: Long,
specifiedFileCleaner: SerializableConsumer[Path],
parallelism: Int,
dryRun: Boolean,
@transient spark: SparkSession)
extends OrphanFilesClean(specifiedTable, specifiedOlderThanMillis, specifiedFileCleaner)
with SQLConfHelper
Expand Down Expand Up @@ -124,19 +127,23 @@ case class SparkOrphanFilesClean(
.flatMap {
dir =>
tryBestListingDirs(new Path(dir)).asScala.filter(oldEnough).map {
file => (file.getPath.getName, file.getPath.toUri.toString, file.getLen)
file =>
val path = file.getPath
(path.getName, path.toUri.toString, file.getLen, path.getParent.toUri.toString)
}
}
.toDF("name", "path", "len")
.toDF("name", "path", "len", "dataDir")
.repartition(parallelism)

// use left anti to filter files which is not used
val deleted = candidates
.join(usedFiles, $"name" === $"used_name", "left_anti")
.repartition($"dataDir")
.mapPartitions {
it =>
var deletedFilesCount = 0L
var deletedFilesLenInBytes = 0L
val dataDirs = new mutable.HashSet[String]()

while (it.hasNext) {
val fileInfo = it.next();
Expand All @@ -145,12 +152,23 @@ case class SparkOrphanFilesClean(
deletedFilesLenInBytes += fileInfo.getLong(2)
specifiedFileCleaner.accept(deletedPath)
logInfo(s"Cleaned file: $pathToClean")
dataDirs.add(fileInfo.getString(3))
deletedFilesCount += 1
}

// clean empty directory
if (!dryRun) {
val bucketDirs = dataDirs
.filter(_.contains(BUCKET_PATH_PREFIX))
.map(new Path(_))
tryCleanDataDirectory(bucketDirs.asJava, partitionKeysNum + 1)
}

logInfo(
s"Total cleaned files: $deletedFilesCount, Total cleaned files len : $deletedFilesLenInBytes")
Iterator.single((deletedFilesCount, deletedFilesLenInBytes))
}

val finalDeletedDataset =
if (deletedFilesCountInLocal.get() != 0 || deletedFilesLenInBytesInLocal.get() != 0) {
deleted.union(
Expand Down Expand Up @@ -181,7 +199,8 @@ object SparkOrphanFilesClean extends SQLConfHelper {
tableName: String,
olderThanMillis: Long,
fileCleaner: SerializableConsumer[Path],
parallelismOpt: Integer): CleanOrphanFilesResult = {
parallelismOpt: Integer,
dryRun: Boolean): CleanOrphanFilesResult = {
val spark = SparkSession.active
val parallelism = if (parallelismOpt == null) {
Math.max(spark.sparkContext.defaultParallelism, conf.numShufflePartitions)
Expand Down Expand Up @@ -213,6 +232,7 @@ object SparkOrphanFilesClean extends SQLConfHelper {
olderThanMillis,
fileCleaner,
parallelism,
dryRun,
spark
).doOrphanClean()
}.unzip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,4 +240,75 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase {
sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than')"),
Row(1, 1) :: Nil)
}

test("Paimon procedure: clean empty directory after removing orphan files") {
spark.sql("""
|CREATE TABLE T (k STRING, pt STRING)
|using paimon TBLPROPERTIES ('primary-key'='k,pt', 'bucket'='1',
|'snapshot.clean-empty-directories'='false') PARTITIONED BY (pt);
|""".stripMargin)

spark.sql("""
|insert into T values
|("a", "2024-06-02"),("b", "2024-06-02"),("d", "2024-06-03"),
|("c", "2024-06-01"),("Never-expire", "9999-09-09");
|
|""".stripMargin)

// by default, no file deleted
checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0, 0) :: Nil)

spark.sql(
"CALL sys.expire_partitions(table => 'T' , " +
"expiration_time => '1 d', timestamp_formatter => 'yyyy-MM-dd', max_expires => 3);")

// insert a new snapshot to clean expired partitioned files
spark.sql("insert into T values ('Never-expire-2', '9999-09-09')")

val table = loadTable("T")
val fileIO = table.fileIO()
val tablePath = table.location()

val partitionValue = "pt=2024-06-01"
val partitionPath = tablePath + "/" + partitionValue
val orphanFile1 = new Path(partitionPath, ORPHAN_FILE_1)
val orphanFile2 = new Path(partitionPath, ORPHAN_FILE_2)
fileIO.writeFile(orphanFile1, "a", true)
Thread.sleep(2000)
fileIO.writeFile(orphanFile2, "b", true)

checkAnswer(
spark.sql("CALL paimon.sys.expire_snapshots(table => 'T', retain_max => 1)"),
Row(2) :: Nil)

val older_than1 = new java.sql.Timestamp(
fileIO.getFileStatus(orphanFile2).getModificationTime - TimeUnit.SECONDS.toMillis(1))

// partition 'pt=2024-06-01' has one orphan file left
assertResult(true)(
fileIO
.listDirectories(tablePath)
.map(status => status.getPath.getName)
.contains(partitionValue))

checkAnswer(
spark.sql(
s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than1', mode => 'distributed')"),
Row(1, 1) :: Nil)

val older_than2 = new java.sql.Timestamp(System.currentTimeMillis())

checkAnswer(
spark.sql(
s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than2', mode => 'local')"),
Row(1, 1) :: Nil)

// partition 'pt=2024-06-01' has no orphan files, clean empty directory
assertResult(false)(
fileIO
.listDirectories(tablePath)
.map(status => status.getPath.getName)
.contains(partitionValue))
}

}
Loading