Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -368,9 +368,14 @@ public String fileFormat() {
return split[split.length - 1];
}

@Nullable
public String externalPath() {
return externalPath;
public Optional<String> externalPath() {
return Optional.ofNullable(externalPath);
}

public Optional<String> externalPathDir() {
return Optional.ofNullable(externalPath)
.map(Path::new)
.map(p -> p.getParent().toUri().toString());
}

public Optional<FileSource> fileSource() {
Expand Down Expand Up @@ -405,7 +410,8 @@ public DataFileMeta upgrade(int newLevel) {
externalPath);
}

public DataFileMeta rename(String newExternalPath, String newFileName) {
public DataFileMeta rename(String newFileName) {
String newExternalPath = externalPathDir().map(dir -> dir + "/" + newFileName).orElse(null);
return new DataFileMeta(
newFileName,
fileSize,
Expand Down Expand Up @@ -452,7 +458,7 @@ public DataFileMeta copyWithoutStats() {
public List<Path> collectFiles(DataFilePathFactory pathFactory) {
List<Path> paths = new ArrayList<>();
paths.add(pathFactory.toPath(this));
extraFiles.forEach(f -> paths.add(pathFactory.toExtraFilePath(this, f)));
extraFiles.forEach(f -> paths.add(pathFactory.toAlignedPath(f, this)));
return paths;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public InternalRow toRow(DataFileMeta meta) {
meta.embeddedIndex(),
meta.fileSource().map(FileSource::toByteValue).orElse(null),
toStringArrayData(meta.valueStatsCols()),
BinaryString.fromString(meta.externalPath()));
meta.externalPath().map(BinaryString::fromString).orElse(null));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.FileEntry;

import javax.annotation.concurrent.ThreadSafe;

import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -67,47 +69,36 @@ public Path newChangelogPath() {
return newPath(changelogFilePrefix);
}

private Path newPath(String prefix) {
public String newChangelogFileName() {
return newFileName(changelogFilePrefix);
}

public Path newPath(String prefix) {
return new Path(parent, newFileName(prefix));
}

private String newFileName(String prefix) {
String extension;
if (fileSuffixIncludeCompression) {
extension = "." + fileCompression + "." + formatIdentifier;
} else {
extension = "." + formatIdentifier;
}
String name = prefix + uuid + "-" + pathCount.getAndIncrement() + extension;
return new Path(parent, name);
return prefix + uuid + "-" + pathCount.getAndIncrement() + extension;
}

@VisibleForTesting
public Path toPath(String fileName) {
return new Path(parent + "/" + fileName);
}

/**
* for read purpose.
*
* @param fileName the file name
* @param externalPath the external path, if null, it will use the parent path
* @return the file's path
*/
public Path toPath(String fileName, String externalPath) {
return new Path((externalPath == null ? parent : externalPath) + "/" + fileName);
public Path toPath(DataFileMeta file) {
return file.externalPath().map(Path::new).orElse(new Path(parent, file.fileName()));
}

public Path toPath(DataFileMeta dataFileMeta) {
String externalPath = dataFileMeta.externalPath();
String fileName = dataFileMeta.fileName();
return new Path((externalPath == null ? parent : externalPath) + "/" + fileName);
public Path toPath(FileEntry file) {
return Optional.ofNullable(file.externalPath())
.map(Path::new)
.orElse(new Path(parent, file.fileName()));
}

public Path toExtraFilePath(DataFileMeta dataFileMeta, String extraFile) {
String externalPath = dataFileMeta.externalPath();
return new Path((externalPath == null ? parent : externalPath) + "/" + extraFile);
}

@VisibleForTesting
public String uuid() {
return uuid;
public Path toAlignedPath(String fileName, DataFileMeta aligned) {
return new Path(aligned.externalPathDir().map(Path::new).orElse(parent), fileName);
}

public static Path dataFileToFileIndexPath(Path dataFilePath) {
Expand Down Expand Up @@ -141,4 +132,9 @@ public static String formatIdentifier(String fileName) {

return fileName.substring(index + 1);
}

@VisibleForTesting
String uuid() {
return uuid;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static FileIndexResult evaluate(
// go to file index check
try (FileIndexPredicate predicate =
new FileIndexPredicate(
dataFilePathFactory.toExtraFilePath(file, indexFiles.get(0)),
dataFilePathFactory.toAlignedPath(indexFiles.get(0), file),
fileIO,
dataSchema.logicalRowType())) {
return predicate.evaluate(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader;
Expand Down Expand Up @@ -98,37 +97,17 @@ private KeyValueFileReaderFactory(

@Override
public RecordReader<KeyValue> createRecordReader(DataFileMeta file) throws IOException {
return createRecordReader(
file.schemaId(),
file.fileName(),
file.fileSize(),
file.level(),
file.externalPath());
}

@VisibleForTesting
public RecordReader<KeyValue> createRecordReader(
long schemaId, String fileName, long fileSize, int level, String externalPath)
throws IOException {
if (fileSize >= asyncThreshold && fileName.endsWith(".orc")) {
return new AsyncRecordReader<>(
() ->
createRecordReader(
schemaId, fileName, level, false, 2, fileSize, externalPath));
if (file.fileSize() >= asyncThreshold && file.fileName().endsWith(".orc")) {
return new AsyncRecordReader<>(() -> createRecordReader(file, false, 2));
}
return createRecordReader(schemaId, fileName, level, true, null, fileSize, externalPath);
return createRecordReader(file, true, null);
}

private FileRecordReader<KeyValue> createRecordReader(
long schemaId,
String fileName,
int level,
boolean reuseFormat,
@Nullable Integer orcPoolSize,
long fileSize,
String externalPath)
DataFileMeta file, boolean reuseFormat, @Nullable Integer orcPoolSize)
throws IOException {
String formatIdentifier = DataFilePathFactory.formatIdentifier(fileName);
String formatIdentifier = DataFilePathFactory.formatIdentifier(file.fileName());
long schemaId = file.schemaId();

Supplier<FormatReaderMapping> formatSupplier =
() ->
Expand All @@ -143,8 +122,9 @@ private FileRecordReader<KeyValue> createRecordReader(
new FormatKey(schemaId, formatIdentifier),
key -> formatSupplier.get())
: formatSupplier.get();
Path filePath = pathFactory.toPath(fileName, externalPath);
Path filePath = pathFactory.toPath(file);

long fileSize = file.fileSize();
FileRecordReader<InternalRow> fileRecordReader =
new DataFileRecordReader(
formatReaderMapping.getReaderFactory(),
Expand All @@ -156,13 +136,13 @@ private FileRecordReader<KeyValue> createRecordReader(
formatReaderMapping.getCastMapping(),
PartitionUtils.create(formatReaderMapping.getPartitionPair(), partition));

Optional<DeletionVector> deletionVector = dvFactory.create(fileName);
Optional<DeletionVector> deletionVector = dvFactory.create(file.fileName());
if (deletionVector.isPresent() && !deletionVector.get().isEmpty()) {
fileRecordReader =
new ApplyDeletionVectorReader(fileRecordReader, deletionVector.get());
}

return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, level);
return new KeyValueDataFileRecordReader(fileRecordReader, keyType, valueType, file.level());
}

public static Builder builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,23 +142,22 @@ private KeyValueDataFileWriter createDataFileWriter(
fileIndexOptions);
}

public void deleteFile(DataFileMeta meta, int level) {
fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(meta));
public void deleteFile(DataFileMeta file) {
fileIO.deleteQuietly(formatContext.pathFactory(file.level()).toPath(file));
}

public void copyFile(DataFileMeta sourceMeta, DataFileMeta targetMeta, int level)
throws IOException {
Path sourcePath = formatContext.pathFactory(level).toPath(sourceMeta);
Path targetPath = formatContext.pathFactory(level).toPath(targetMeta);
public void copyFile(DataFileMeta sourceFile, DataFileMeta targetFile) throws IOException {
Path sourcePath = formatContext.pathFactory(sourceFile.level()).toPath(sourceFile);
Path targetPath = formatContext.pathFactory(targetFile.level()).toPath(targetFile);
fileIO.copyFile(sourcePath, targetPath, true);
}

public FileIO getFileIO() {
return fileIO;
}

public Path newChangelogPath(int level) {
return formatContext.pathFactory(level).newChangelogPath();
public String newChangelogFileName(int level) {
return formatContext.pathFactory(level).newChangelogFileName();
}

public static Builder builder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public interface FileEntry {

String fileName();

@Nullable
String externalPath();

Identifier identifier();
Expand Down Expand Up @@ -161,7 +162,9 @@ public String toString(FileStorePathFactory pathFactory) {
+ ", extraFiles "
+ extraFiles
+ ", embeddedIndex "
+ Arrays.toString(embeddedIndex);
+ Arrays.toString(embeddedIndex)
+ ", externalPath "
+ externalPath;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TinyIntType;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -92,9 +94,10 @@ public String fileName() {
return file.fileName();
}

@Nullable
@Override
public String externalPath() {
return file.externalPath();
return file.externalPath().orElse(null);
}

@Override
Expand Down Expand Up @@ -129,7 +132,7 @@ public Identifier identifier() {
file.fileName(),
file.extraFiles(),
file.embeddedIndex(),
file.externalPath());
externalPath());
}

public ManifestEntry copyWithoutStats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public String fileName() {
return fileName;
}

@Nullable
@Override
public String externalPath() {
return externalPath;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.paimon.compression.CompressOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.fs.Path;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
Expand Down Expand Up @@ -242,10 +241,9 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul
} else if (changelogProducer == ChangelogProducer.INPUT && isInsertOnly) {
List<DataFileMeta> changelogMetas = new ArrayList<>();
for (DataFileMeta dataMeta : dataMetas) {
Path newPath = writerFactory.newChangelogPath(0);
DataFileMeta changelogMeta =
dataMeta.rename(newPath.getParent().getName(), newPath.getName());
writerFactory.copyFile(dataMeta, changelogMeta, 0);
String newFileName = writerFactory.newChangelogFileName(0);
DataFileMeta changelogMeta = dataMeta.rename(newFileName);
writerFactory.copyFile(dataMeta, changelogMeta);
changelogMetas.add(changelogMeta);
}
newFilesChangelog.addAll(changelogMetas);
Expand Down Expand Up @@ -343,7 +341,7 @@ private void updateCompactResult(CompactResult result) {
// 2. This file is not the input of upgraded.
if (!compactBefore.containsKey(file.fileName())
&& !afterFiles.contains(file.fileName())) {
writerFactory.deleteFile(file, file.level());
writerFactory.deleteFile(file);
}
} else {
compactBefore.put(file.fileName(), file);
Expand Down Expand Up @@ -377,7 +375,7 @@ public void close() throws Exception {
deletedFiles.clear();

for (DataFileMeta file : newFilesChangelog) {
writerFactory.deleteFile(file, file.level());
writerFactory.deleteFile(file);
}
newFilesChangelog.clear();

Expand All @@ -392,12 +390,12 @@ public void close() throws Exception {
compactAfter.clear();

for (DataFileMeta file : compactChangelog) {
writerFactory.deleteFile(file, file.level());
writerFactory.deleteFile(file);
}
compactChangelog.clear();

for (DataFileMeta file : delete) {
writerFactory.deleteFile(file, file.level());
writerFactory.deleteFile(file);
}

if (compactDeletionFile != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,8 @@ public Optional<List<RawFile>> convertToRawFiles() {
}

private RawFile makeRawTableFile(String bucketPath, DataFileMeta file) {
String path = file.externalPath() != null ? file.externalPath() : bucketPath;
path += "/" + file.fileName();
return new RawFile(
path,
file.externalPath().orElse(bucketPath + "/" + file.fileName()),
file.fileSize(),
0,
file.fileSize(),
Expand Down
Loading
Loading