Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.BranchManager;
import org.apache.paimon.utils.DateTimeUtils;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.SerializableConsumer;
Expand Down Expand Up @@ -252,12 +253,14 @@ protected void collectWithoutDataFileWithManifestFlag(

/** List directories that contains data files and manifest files. */
protected List<Path> listPaimonFileDirs() {
FileStorePathFactory pathFactory = table.store().pathFactory();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix? Could you provide a detailed description of the bug?

Copy link
Contributor Author

@Zouxxyy Zouxxyy Jan 9, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix? Could you provide a detailed description of the bug?

See line 263, pathFactory.dataFilePath()


List<Path> paimonFileDirs = new ArrayList<>();

paimonFileDirs.add(new Path(location, "manifest"));
paimonFileDirs.add(new Path(location, "index"));
paimonFileDirs.add(new Path(location, "statistics"));
paimonFileDirs.addAll(listFileDirs(location, partitionKeysNum));
paimonFileDirs.add(pathFactory.manifestPath());
paimonFileDirs.add(pathFactory.indexPath());
paimonFileDirs.add(pathFactory.statisticsPath());
paimonFileDirs.addAll(listFileDirs(pathFactory.dataFilePath(), partitionKeysNum));

return paimonFileDirs;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@
@ThreadSafe
public class FileStorePathFactory {

public static final String MANIFEST_PATH = "manifest";
public static final String MANIFEST_PREFIX = "manifest-";
public static final String MANIFEST_LIST_PREFIX = "manifest-list-";
public static final String INDEX_MANIFEST_PREFIX = "index-manifest-";

public static final String INDEX_PATH = "index";
public static final String INDEX_PREFIX = "index-";

public static final String STATISTICS_PATH = "statistics";
public static final String STATISTICS_PREFIX = "stat-";

Comment on lines +40 to +50
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think both public and private are fine, and the BUCKET_PATH_PREFIX is also public.

public static final String BUCKET_PATH_PREFIX = "bucket-";

// this is the table schema root path
Expand Down Expand Up @@ -94,6 +105,25 @@ public Path root() {
return root;
}

public Path manifestPath() {
return new Path(root, MANIFEST_PATH);
}

public Path indexPath() {
return new Path(root, INDEX_PATH);
}

public Path statisticsPath() {
return new Path(root, STATISTICS_PATH);
}

public Path dataFilePath() {
if (dataFilePathDirectory != null) {
return new Path(root, dataFilePathDirectory);
}
return root;
}

@VisibleForTesting
public static InternalRowPartitionComputer getPartitionComputer(
RowType partitionType, String defaultPartValue, boolean legacyPartitionName) {
Expand All @@ -103,25 +133,21 @@ public static InternalRowPartitionComputer getPartitionComputer(
}

public Path newManifestFile() {
return new Path(
root + "/manifest/manifest-" + uuid + "-" + manifestFileCount.getAndIncrement());
return toManifestFilePath(
MANIFEST_PREFIX + uuid + "-" + manifestFileCount.getAndIncrement());
}

public Path newManifestList() {
return new Path(
root
+ "/manifest/manifest-list-"
+ uuid
+ "-"
+ manifestListCount.getAndIncrement());
return toManifestListPath(
MANIFEST_LIST_PREFIX + uuid + "-" + manifestListCount.getAndIncrement());
}

public Path toManifestFilePath(String manifestFileName) {
return new Path(root + "/manifest/" + manifestFileName);
return new Path(manifestPath(), manifestFileName);
}

public Path toManifestListPath(String manifestListName) {
return new Path(root + "/manifest/" + manifestListName);
return new Path(manifestPath(), manifestListName);
}

public DataFilePathFactory createDataFilePathFactory(BinaryRow partition, int bucket) {
Expand Down Expand Up @@ -217,17 +243,13 @@ public PathFactory indexManifestFileFactory() {
return new PathFactory() {
@Override
public Path newPath() {
return new Path(
root
+ "/manifest/index-manifest-"
+ uuid
+ "-"
+ indexManifestCount.getAndIncrement());
return toPath(
INDEX_MANIFEST_PREFIX + uuid + "-" + indexManifestCount.getAndIncrement());
}

@Override
public Path toPath(String fileName) {
return new Path(root + "/manifest/" + fileName);
return new Path(manifestPath(), fileName);
}
};
}
Expand All @@ -236,13 +258,12 @@ public PathFactory indexFileFactory() {
return new PathFactory() {
@Override
public Path newPath() {
return new Path(
root + "/index/index-" + uuid + "-" + indexFileCount.getAndIncrement());
return toPath(INDEX_PREFIX + uuid + "-" + indexFileCount.getAndIncrement());
}

@Override
public Path toPath(String fileName) {
return new Path(root + "/index/" + fileName);
return new Path(indexPath(), fileName);
}
};
}
Expand All @@ -251,17 +272,12 @@ public PathFactory statsFileFactory() {
return new PathFactory() {
@Override
public Path newPath() {
return new Path(
root
+ "/statistics/stats-"
+ uuid
+ "-"
+ statsFileCount.getAndIncrement());
return toPath(STATISTICS_PREFIX + uuid + "-" + statsFileCount.getAndIncrement());
}

@Override
public Path toPath(String fileName) {
return new Path(root + "/statistics/" + fileName);
return new Path(statisticsPath(), fileName);
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,25 @@ class RemoveOrphanFilesProcedureTest extends PaimonSparkTestBase {
checkAnswer(spark.sql(s"CALL sys.remove_orphan_files(table => 'T')"), Row(0, 0) :: Nil)
}

test("Paimon procedure: remove orphan files with data file path directory") {
sql(s"""
|CREATE TABLE T (id STRING, name STRING)
|USING PAIMON
|TBLPROPERTIES ('primary-key'='id', 'data-file.path-directory'='data')
|""".stripMargin)

sql(s"INSERT INTO T VALUES ('1', 'a'), ('2', 'b')")

val table = loadTable("T")
val orphanFile = new Path(table.store().pathFactory().dataFilePath(), ORPHAN_FILE_1)
table.fileIO().tryToWriteAtomic(orphanFile, "b")

Thread.sleep(1000)
val older_than = DateTimeUtils.formatLocalDateTime(
DateTimeUtils.toLocalDateTime(System.currentTimeMillis()),
3)
checkAnswer(
sql(s"CALL sys.remove_orphan_files(table => 'T', older_than => '$older_than')"),
Row(1, 1) :: Nil)
}
}
Loading