Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.paimon.iceberg.metadata.IcebergSnapshot;
import org.apache.paimon.iceberg.metadata.IcebergSnapshotSummary;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.options.Options;
Expand Down Expand Up @@ -446,11 +447,16 @@ private boolean collectFileChanges(
Map<String, BinaryRow> removedFiles,
Map<String, Pair<BinaryRow, DataFileMeta>> addedFiles) {
boolean isAddOnly = true;
Map<Pair<BinaryRow, Integer>, DataFilePathFactory> dataFilePathFactoryMap = new HashMap<>();
for (ManifestEntry entry : manifestEntries) {
String path =
fileStorePathFactory.bucketPath(entry.partition(), entry.bucket())
+ "/"
+ entry.fileName();
Pair<BinaryRow, Integer> bucket = Pair.of(entry.partition(), entry.bucket());
DataFilePathFactory dataFilePathFactory =
dataFilePathFactoryMap.computeIfAbsent(
bucket,
b ->
fileStorePathFactory.createDataFilePathFactory(
entry.partition(), entry.bucket()));
String path = dataFilePathFactory.toPath(entry).toString();
switch (entry.kind()) {
case ADD:
if (shouldAddFileToIceberg(entry.file())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ public Path toAlignedPath(String fileName, DataFileMeta aligned) {
return new Path(aligned.externalPathDir().map(Path::new).orElse(parent), fileName);
}

public Path toAlignedPath(String fileName, FileEntry aligned) {
Optional<String> externalPathDir =
Optional.ofNullable(aligned.externalPath())
.map(Path::new)
.map(p -> p.getParent().toUri().toString());
return new Path(externalPathDir.map(Path::new).orElse(parent), fileName);
}

public static Path dataFileToFileIndexPath(Path dataFilePath) {
return new Path(dataFilePath.getParent(), dataFilePath.getName() + INDEX_PATH_SUFFIX);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.ExpireFileEntry;
import org.apache.paimon.manifest.FileEntry;
import org.apache.paimon.manifest.FileEntry.Identifier;
Expand Down Expand Up @@ -216,17 +217,24 @@ protected void getDataFileToDelete(
List<ExpireFileEntry> dataFileEntries) {
// we cannot delete a data file directly when we meet a DELETE entry, because that
// file might be upgraded
Map<Pair<BinaryRow, Integer>, DataFilePathFactory> dataFilePathFactoryMap = new HashMap<>();
for (ExpireFileEntry entry : dataFileEntries) {
Path bucketPath = pathFactory.bucketPath(entry.partition(), entry.bucket());
Path dataFilePath = new Path(bucketPath, entry.fileName());
Pair<BinaryRow, Integer> bucket = Pair.of(entry.partition(), entry.bucket());
DataFilePathFactory dataFilePathFactory =
dataFilePathFactoryMap.computeIfAbsent(
bucket,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think bucket -> partitionAndBucket is better?

b ->
pathFactory.createDataFilePathFactory(
entry.partition(), entry.bucket()));
Path dataFilePath = dataFilePathFactory.toPath(entry);
switch (entry.kind()) {
case ADD:
dataFileToDelete.remove(dataFilePath);
break;
case DELETE:
List<Path> extraFiles = new ArrayList<>(entry.extraFiles().size());
for (String file : entry.extraFiles()) {
extraFiles.add(new Path(bucketPath, file));
extraFiles.add(dataFilePathFactory.toAlignedPath(file, entry));
}
dataFileToDelete.put(dataFilePath, Pair.of(entry, extraFiles));
break;
Expand Down Expand Up @@ -259,12 +267,17 @@ public void deleteAddedDataFiles(String manifestListName) {

private void deleteAddedDataFiles(List<ExpireFileEntry> manifestEntries) {
List<Path> dataFileToDelete = new ArrayList<>();
Map<Pair<BinaryRow, Integer>, DataFilePathFactory> dataFilePathFactoryMap = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extract a common util method?
These logic code has appeared multi times.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do this.

for (ExpireFileEntry entry : manifestEntries) {
Pair<BinaryRow, Integer> bucket = Pair.of(entry.partition(), entry.bucket());
DataFilePathFactory dataFilePathFactory =
dataFilePathFactoryMap.computeIfAbsent(
bucket,
b ->
pathFactory.createDataFilePathFactory(
entry.partition(), entry.bucket()));
if (entry.kind() == FileKind.ADD) {
dataFileToDelete.add(
new Path(
pathFactory.bucketPath(entry.partition(), entry.bucket()),
entry.fileName()));
dataFileToDelete.add(dataFilePathFactory.toPath(entry));
recordDeletionBuckets(entry);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public CleanOrphanFilesResult clean()
// specially handle to clear snapshot dir
cleanSnapshotDir(branches, deleteFiles::add, deletedFilesLenInBytes::addAndGet);

// delete candidate files
// get candidate files
Map<String, Pair<Path, Long>> candidates = getCandidateDeletingFiles();
if (candidates.isEmpty()) {
return new CleanOrphanFilesResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,14 @@ protected List<Path> listPaimonFileDirs() {
paimonFileDirs.add(pathFactory.statisticsPath());
paimonFileDirs.addAll(listFileDirs(pathFactory.dataFilePath(), partitionKeysNum));

// add external data paths
String dataFileExternalPaths = table.store().options().dataFileExternalPaths();
if (dataFileExternalPaths != null) {
String[] externalPathArr = dataFileExternalPaths.split(",");
for (String externalPath : externalPathArr) {
paimonFileDirs.addAll(listFileDirs(new Path(externalPath), partitionKeysNum));
}
}
return paimonFileDirs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.ExpireFileEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.stats.StatsFileHandler;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -78,12 +80,19 @@ public void cleanUnusedDataFiles(Snapshot taggedSnapshot, Predicate<ExpireFileEn
}

Set<Path> dataFileToDelete = new HashSet<>();
Map<Pair<BinaryRow, Integer>, DataFilePathFactory> dataFilePathFactoryMap = new HashMap<>();
for (ExpireFileEntry entry : manifestEntries) {
Pair<BinaryRow, Integer> bucket = Pair.of(entry.partition(), entry.bucket());
DataFilePathFactory dataFilePathFactory =
dataFilePathFactoryMap.computeIfAbsent(
bucket,
b ->
pathFactory.createDataFilePathFactory(
entry.partition(), entry.bucket()));
if (!skipper.test(entry)) {
Path bucketPath = pathFactory.bucketPath(entry.partition(), entry.bucket());
dataFileToDelete.add(new Path(bucketPath, entry.fileName()));
dataFileToDelete.add(dataFilePathFactory.toPath(entry));
for (String file : entry.extraFiles()) {
dataFileToDelete.add(new Path(bucketPath, file));
dataFileToDelete.add(dataFilePathFactory.toAlignedPath(file, entry));
}

recordDeletionBuckets(entry);
Expand Down
53 changes: 37 additions & 16 deletions paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.IndexIncrement;
import org.apache.paimon.manifest.FileEntry;
import org.apache.paimon.manifest.FileKind;
Expand Down Expand Up @@ -58,6 +59,7 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.FileStorePathFactory;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
Expand Down Expand Up @@ -643,11 +645,17 @@ private static Set<Path> getSnapshotFileInUse(
.flatMap(m -> manifestFile.read(m.fileName()).stream())
.collect(Collectors.toList());
entries = new ArrayList<>(FileEntry.mergeEntries(entries));
Map<Pair<BinaryRow, Integer>, DataFilePathFactory> dataFilePathFactoryMap = new HashMap<>();

for (ManifestEntry entry : entries) {
result.add(
new Path(
pathFactory.bucketPath(entry.partition(), entry.bucket()),
entry.file().fileName()));
Pair<BinaryRow, Integer> bucket = Pair.of(entry.partition(), entry.bucket());
DataFilePathFactory dataFilePathFactory =
dataFilePathFactoryMap.computeIfAbsent(
bucket,
b ->
pathFactory.createDataFilePathFactory(
entry.partition(), entry.bucket()));
result.add(dataFilePathFactory.toPath(entry));
}

// Add 'DELETE' 'APPEND' file in snapshot
Expand All @@ -666,10 +674,14 @@ private static Set<Path> getSnapshotFileInUse(
if (entry.kind() == FileKind.DELETE
&& entry.file().fileSource().orElse(FileSource.APPEND)
== FileSource.APPEND) {
result.add(
new Path(
pathFactory.bucketPath(entry.partition(), entry.bucket()),
entry.file().fileName()));
Pair<BinaryRow, Integer> bucket = Pair.of(entry.partition(), entry.bucket());
DataFilePathFactory dataFilePathFactory =
dataFilePathFactoryMap.computeIfAbsent(
bucket,
b ->
pathFactory.createDataFilePathFactory(
entry.partition(), entry.bucket()));
result.add(dataFilePathFactory.toPath(entry));
}
}
}
Expand All @@ -694,6 +706,7 @@ private static Set<Path> getChangelogFileInUse(
// changelog file
result.add(changelogPath);

Map<Pair<BinaryRow, Integer>, DataFilePathFactory> dataFilePathFactoryMap = new HashMap<>();
// data file
// not all manifests contains useful data file
// (1) produceChangelog = 'true': data file in changelog manifests
Expand All @@ -716,10 +729,14 @@ private static Set<Path> getChangelogFileInUse(
.collect(Collectors.toList());
for (ManifestEntry entry : files) {
if (entry.file().fileSource().orElse(FileSource.APPEND) == FileSource.APPEND) {
result.add(
new Path(
pathFactory.bucketPath(entry.partition(), entry.bucket()),
entry.file().fileName()));
Pair<BinaryRow, Integer> bucket = Pair.of(entry.partition(), entry.bucket());
DataFilePathFactory dataFilePathFactory =
dataFilePathFactoryMap.computeIfAbsent(
bucket,
b ->
pathFactory.createDataFilePathFactory(
entry.partition(), entry.bucket()));
result.add(dataFilePathFactory.toPath(entry));
}
}
} else if (changelog.changelogManifestList() != null) {
Expand All @@ -731,10 +748,14 @@ private static Set<Path> getChangelogFileInUse(
.flatMap(m -> manifestFile.read(m.fileName()).stream())
.collect(Collectors.toList());
for (ManifestEntry entry : files) {
result.add(
new Path(
pathFactory.bucketPath(entry.partition(), entry.bucket()),
entry.file().fileName()));
Pair<BinaryRow, Integer> bucket = Pair.of(entry.partition(), entry.bucket());
DataFilePathFactory dataFilePathFactory =
dataFilePathFactoryMap.computeIfAbsent(
bucket,
b ->
pathFactory.createDataFilePathFactory(
entry.partition(), entry.bucket()));
result.add(dataFilePathFactory.toPath(entry));
}
}
return result;
Expand Down
Loading
Loading