From 0bb98830c994b9950e62c38ee2f5baf1b6cb5498 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Mon, 23 Dec 2024 21:19:16 +0800 Subject: [PATCH 1/9] support read external path in DataFileMeta --- .../paimon/append/AppendOnlyWriter.java | 4 ++-- .../org/apache/paimon/io/DataFileMeta.java | 4 ++-- .../apache/paimon/io/DataFilePathFactory.java | 14 +++++++++++++ .../paimon/io/KeyValueFileReaderFactory.java | 21 +++++++++++++------ .../paimon/io/KeyValueFileWriterFactory.java | 12 +++++++++++ .../paimon/manifest/ExpireFileEntry.java | 18 +++++++++++++--- .../org/apache/paimon/manifest/FileEntry.java | 2 ++ .../apache/paimon/manifest/ManifestEntry.java | 5 +++++ .../paimon/manifest/SimpleFileEntry.java | 21 +++++++++++++++---- .../manifest/SimpleFileEntrySerializer.java | 3 ++- .../paimon/mergetree/MergeTreeWriter.java | 14 ++++++++----- .../paimon/operation/FileStoreCommitImpl.java | 2 +- .../paimon/operation/RawFileSplitRead.java | 2 +- .../paimon/table/query/LocalTableQuery.java | 3 ++- .../paimon/io/KeyValueFileReadWriteTest.java | 5 +++-- .../paimon/mergetree/ContainsLevelsTest.java | 6 +++++- .../paimon/mergetree/LookupLevelsTest.java | 6 +++++- .../paimon/flink/clone/PickFilesUtil.java | 2 +- .../changelog/ChangelogCompactTask.java | 4 ++-- 19 files changed, 115 insertions(+), 33 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index a3087e362864..e70410b5d371 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -244,7 +244,7 @@ public void close() throws Exception { for (DataFileMeta file : compactAfter) { // appendOnlyCompactManager will rewrite the file and no file upgrade will occur, so we // can directly delete the file in compactAfter. - fileIO.deleteQuietly(pathFactory.toPath(file.fileName())); + fileIO.deleteQuietly(pathFactory.toPath(file.fileName(), file.externalPath())); } sinkWriter.close(); @@ -271,7 +271,7 @@ public void toBufferedWriter() throws Exception { } finally { // remove small files for (DataFileMeta file : files) { - fileIO.deleteQuietly(pathFactory.toPath(file.fileName())); + fileIO.deleteQuietly(pathFactory.toPath(file.fileName(), file.externalPath())); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index 3be09ea6c229..e2e9fece304f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -449,8 +449,8 @@ public DataFileMeta copyWithoutStats() { public List collectFiles(DataFilePathFactory pathFactory) { List paths = new ArrayList<>(); - paths.add(pathFactory.toPath(fileName)); - extraFiles.forEach(f -> paths.add(pathFactory.toPath(f))); + paths.add(pathFactory.toPath(fileName, externalPath)); + extraFiles.forEach(f -> paths.add(pathFactory.toPath(f, externalPath))); return paths; } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java index b632d44c9420..c32f81eb1036 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java @@ -82,6 +82,20 @@ public Path toPath(String fileName) { return new Path(parent + "/" + fileName); } + /** + * for read purpose. + * + * @param fileName the file name + * @param externalPath the external path, if null, it will use the parent path + * @return the file's path + */ + public Path toPath(String fileName, String externalPath) { + if (externalPath == null) { + return new Path(parent + "/" + fileName); + } + return new Path(externalPath + "/" + fileName); + } + @VisibleForTesting public String uuid() { return uuid; diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index 7e272fc97c65..af55ae2eb00b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -97,16 +97,24 @@ private KeyValueFileReaderFactory( @Override public RecordReader createRecordReader(DataFileMeta file) throws IOException { - return createRecordReader(file.schemaId(), file.fileName(), file.fileSize(), file.level()); + return createRecordReader( + file.schemaId(), + file.fileName(), + file.fileSize(), + file.level(), + file.externalPath()); } public RecordReader createRecordReader( - long schemaId, String fileName, long fileSize, int level) throws IOException { + long schemaId, String fileName, long fileSize, int level, String externalPath) + throws IOException { if (fileSize >= asyncThreshold && fileName.endsWith(".orc")) { return new AsyncRecordReader<>( - () -> createRecordReader(schemaId, fileName, level, false, 2, fileSize)); + () -> + createRecordReader( + schemaId, fileName, level, false, 2, fileSize, externalPath)); } - return createRecordReader(schemaId, fileName, level, true, null, fileSize); + return createRecordReader(schemaId, fileName, level, true, null, fileSize, externalPath); } private FileRecordReader createRecordReader( @@ -115,7 +123,8 @@ private FileRecordReader createRecordReader( int level, boolean reuseFormat, @Nullable Integer orcPoolSize, - long fileSize) + long fileSize, + String externalPath) throws IOException { String formatIdentifier = DataFilePathFactory.formatIdentifier(fileName); @@ -132,7 +141,7 @@ private FileRecordReader createRecordReader( new FormatKey(schemaId, formatIdentifier), key -> formatSupplier.get()) : formatSupplier.get(); - Path filePath = pathFactory.toPath(fileName); + Path filePath = pathFactory.toPath(fileName, externalPath); FileRecordReader fileRecordReader = new DataFileRecordReader( diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index a6aae3985bd4..45ea189f52f4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -146,6 +146,10 @@ public void deleteFile(String filename, int level) { fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(filename)); } + public void deleteFile(String filename, int level, String externalPath) { + fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(filename, externalPath)); + } + public void copyFile(String sourceFileName, String targetFileName, int level) throws IOException { Path sourcePath = formatContext.pathFactory(level).toPath(sourceFileName); @@ -153,6 +157,14 @@ public void copyFile(String sourceFileName, String targetFileName, int level) fileIO.copyFile(sourcePath, targetPath, true); } + public void copyFile( + String sourceFileName, String targetFileName, int level, String externalPath) + throws IOException { + Path sourcePath = formatContext.pathFactory(level).toPath(sourceFileName, externalPath); + Path targetPath = formatContext.pathFactory(level).toPath(targetFileName, externalPath); + fileIO.copyFile(sourcePath, targetPath, true); + } + public FileIO getFileIO() { return fileIO; } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java index 060360623cd0..5d6d68144e1c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ExpireFileEntry.java @@ -41,8 +41,19 @@ public ExpireFileEntry( @Nullable byte[] embeddedIndex, BinaryRow minKey, BinaryRow maxKey, - @Nullable FileSource fileSource) { - super(kind, partition, bucket, level, fileName, extraFiles, embeddedIndex, minKey, maxKey); + @Nullable FileSource fileSource, + @Nullable String externalPath) { + super( + kind, + partition, + bucket, + level, + fileName, + extraFiles, + embeddedIndex, + minKey, + maxKey, + externalPath); this.fileSource = fileSource; } @@ -61,7 +72,8 @@ public static ExpireFileEntry from(ManifestEntry entry) { entry.file().embeddedIndex(), entry.minKey(), entry.maxKey(), - entry.file().fileSource().orElse(null)); + entry.file().fileSource().orElse(null), + entry.externalPath()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java index a2569beac61c..2eae103e3e1a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java @@ -56,6 +56,8 @@ public interface FileEntry { Identifier identifier(); + String externalPath(); + BinaryRow minKey(); BinaryRow maxKey(); diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java index 626e0a5d468f..aec6b9063003 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java @@ -92,6 +92,11 @@ public String fileName() { return file.fileName(); } + @Override + public String externalPath() { + return file.externalPath(); + } + @Override public BinaryRow minKey() { return file.minKey(); diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java index fdaed2b85aaf..984f24036009 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java @@ -38,6 +38,7 @@ public class SimpleFileEntry implements FileEntry { @Nullable private final byte[] embeddedIndex; private final BinaryRow minKey; private final BinaryRow maxKey; + private final String externalPath; public SimpleFileEntry( FileKind kind, @@ -48,7 +49,8 @@ public SimpleFileEntry( List extraFiles, @Nullable byte[] embeddedIndex, BinaryRow minKey, - BinaryRow maxKey) { + BinaryRow maxKey, + String externalPath) { this.kind = kind; this.partition = partition; this.bucket = bucket; @@ -58,6 +60,7 @@ public SimpleFileEntry( this.embeddedIndex = embeddedIndex; this.minKey = minKey; this.maxKey = maxKey; + this.externalPath = externalPath; } public static SimpleFileEntry from(ManifestEntry entry) { @@ -70,7 +73,8 @@ public static SimpleFileEntry from(ManifestEntry entry) { entry.file().extraFiles(), entry.file().embeddedIndex(), entry.minKey(), - entry.maxKey()); + entry.maxKey(), + entry.externalPath()); } public static List from(List entries) { @@ -122,6 +126,11 @@ public List extraFiles() { return extraFiles; } + @Override + public String externalPath() { + return externalPath; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -138,12 +147,14 @@ public boolean equals(Object o) { && Objects.equals(fileName, that.fileName) && Objects.equals(extraFiles, that.extraFiles) && Objects.equals(minKey, that.minKey) - && Objects.equals(maxKey, that.maxKey); + && Objects.equals(maxKey, that.maxKey) + && Objects.equals(externalPath, that.externalPath); } @Override public int hashCode() { - return Objects.hash(kind, partition, bucket, level, fileName, extraFiles, minKey, maxKey); + return Objects.hash( + kind, partition, bucket, level, fileName, extraFiles, minKey, maxKey, externalPath); } @Override @@ -165,6 +176,8 @@ public String toString() { + minKey + ", maxKey=" + maxKey + + ", externalPath=" + + externalPath + '}'; } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java index bdc89b8d4c3d..50065a93fadd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java @@ -63,6 +63,7 @@ public SimpleFileEntry convertFrom(int version, InternalRow row) { fromStringArrayData(file.getArray(11)), file.isNullAt(14) ? null : file.getBinary(14), deserializeBinaryRow(file.getBinary(3)), - deserializeBinaryRow(file.getBinary(4))); + deserializeBinaryRow(file.getBinary(4)), + file.getString(17).toString()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index f2a964bae16a..719f6721fdd5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -243,7 +243,11 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul for (DataFileMeta dataMeta : dataMetas) { DataFileMeta changelogMeta = dataMeta.rename(writerFactory.newChangelogPath(0).getName()); - writerFactory.copyFile(dataMeta.fileName(), changelogMeta.fileName(), 0); + writerFactory.copyFile( + dataMeta.fileName(), + changelogMeta.fileName(), + 0, + dataMeta.externalPath()); changelogMetas.add(changelogMeta); } newFilesChangelog.addAll(changelogMetas); @@ -341,7 +345,7 @@ private void updateCompactResult(CompactResult result) { // 2. This file is not the input of upgraded. if (!compactBefore.containsKey(file.fileName()) && !afterFiles.contains(file.fileName())) { - writerFactory.deleteFile(file.fileName(), file.level()); + writerFactory.deleteFile(file.fileName(), file.level(), file.externalPath()); } } else { compactBefore.put(file.fileName(), file); @@ -375,7 +379,7 @@ public void close() throws Exception { deletedFiles.clear(); for (DataFileMeta file : newFilesChangelog) { - writerFactory.deleteFile(file.fileName(), file.level()); + writerFactory.deleteFile(file.fileName(), file.level(), file.externalPath()); } newFilesChangelog.clear(); @@ -390,12 +394,12 @@ public void close() throws Exception { compactAfter.clear(); for (DataFileMeta file : compactChangelog) { - writerFactory.deleteFile(file.fileName(), file.level()); + writerFactory.deleteFile(file.fileName(), file.level(), file.externalPath()); } compactChangelog.clear(); for (DataFileMeta file : delete) { - writerFactory.deleteFile(file.fileName(), file.level()); + writerFactory.deleteFile(file.fileName(), file.level(), file.externalPath()); } if (compactDeletionFile != null) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 001132e1671c..f5556218f8b4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -593,7 +593,7 @@ public void abort(List commitMessages) { toDelete.addAll(commitMessage.compactIncrement().changelogFiles()); for (DataFileMeta file : toDelete) { - fileIO.deleteQuietly(pathFactory.toPath(file.fileName())); + fileIO.deleteQuietly(pathFactory.toPath(file.fileName(), file.externalPath())); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index 4fda82f4e88f..70452dd92ce0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -211,7 +211,7 @@ private FileRecordReader createFileReader( FormatReaderContext formatReaderContext = new FormatReaderContext( fileIO, - dataFilePathFactory.toPath(file.fileName()), + dataFilePathFactory.toPath(file.fileName(), file.externalPath()), file.fileSize(), fileIndexResult); FileRecordReader fileRecordReader = diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index 8ff5ce7a6580..2bedb10cc8cd 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -166,7 +166,8 @@ private void newLookupLevels(BinaryRow partition, int bucket, List file.schemaId(), file.fileName(), file.fileSize(), - file.level()); + file.level(), + file.externalPath()); if (cacheRowFilter != null) { reader = reader.filter( diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index e43cd898dbc2..e81756268980 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -78,7 +78,7 @@ public class KeyValueFileReadWriteTest { public void testReadNonExistentFile() { KeyValueFileReaderFactory readerFactory = createReaderFactory(tempDir.toString(), "avro", null, null); - assertThatThrownBy(() -> readerFactory.createRecordReader(0, "dummy_file.avro", 1, 0)) + assertThatThrownBy(() -> readerFactory.createRecordReader(0, "dummy_file.avro", 1, 0, null)) .hasMessageContaining( "you can configure 'snapshot.time-retained' option with a larger value."); } @@ -312,7 +312,8 @@ private void assertData( meta.schemaId(), meta.fileName(), meta.fileSize(), - meta.level())); + meta.level(), + meta.externalPath())); while (actualKvsIterator.hasNext()) { assertThat(expectedIterator.hasNext()).isTrue(); KeyValue actualKv = actualKvsIterator.next(); diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java index be49311427a0..fa96765a4278 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java @@ -198,7 +198,11 @@ private LookupLevels createContainsLevels(Levels levels, MemorySize max file -> createReaderFactory() .createRecordReader( - 0, file.fileName(), file.fileSize(), file.level()), + 0, + file.fileName(), + file.fileSize(), + file.level(), + file.externalPath()), file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), new HashLookupStoreFactory( new CacheManager(MemorySize.ofMebiBytes(1)), diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index a678534042eb..56c45cfdc442 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -275,7 +275,11 @@ private LookupLevels createLookupLevels(Levels levels, MemorySize maxD file -> createReaderFactory() .createRecordReader( - 0, file.fileName(), file.fileSize(), file.level()), + 0, + file.fileName(), + file.fileSize(), + file.level(), + file.externalPath()), file -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX + UUID.randomUUID()), new HashLookupStoreFactory( new CacheManager(MemorySize.ofMebiBytes(1)), diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java index f83b5cf8f9e3..b7e1e608789b 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/clone/PickFilesUtil.java @@ -108,7 +108,7 @@ private static List getUsedFilesInternal( pathFactory .createDataFilePathFactory( simpleFileEntry.partition(), simpleFileEntry.bucket()) - .toPath(simpleFileEntry.fileName()); + .toPath(simpleFileEntry.fileName(), simpleFileEntry.externalPath()); dataFiles.add(dataFilePath); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java index 6b95e369074b..713dbdd16e16 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java @@ -96,7 +96,7 @@ public List doCompact(FileStoreTable table) throws Exception { outputStream, results, table, - dataFilePathFactory.toPath(meta.fileName()), + dataFilePathFactory.toPath(meta.fileName(), meta.externalPath()), bucket, false, meta); @@ -111,7 +111,7 @@ public List doCompact(FileStoreTable table) throws Exception { outputStream, results, table, - dataFilePathFactory.toPath(meta.fileName()), + dataFilePathFactory.toPath(meta.fileName(), meta.externalPath()), bucket, true, meta); From 82cda9684fa4005b04e1a5cacc6e62f6ca7d28fc Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Mon, 23 Dec 2024 21:41:42 +0800 Subject: [PATCH 2/9] support read external path in DataFileMeta --- .../java/org/apache/paimon/table/source/DataSplit.java | 6 +++++- .../java/org/apache/paimon/table/system/FilesTable.java | 8 ++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index bf60234214fa..56a420ff9845 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -180,8 +180,12 @@ public Optional> convertToRawFiles() { } private RawFile makeRawTableFile(String bucketPath, DataFileMeta file) { + String path = file.externalPath(); + if (path == null) { + path = bucketPath + "/" + file.fileName(); + } return new RawFile( - bucketPath + "/" + file.fileName(), + path, file.fileSize(), 0, file.fileSize(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index 6dcbb322d6d0..d8d5d91db8a9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -385,8 +385,12 @@ private LazyGenericRow toRow( dataSplit.partition()))), dataSplit::bucket, () -> - BinaryString.fromString( - dataSplit.bucketPath() + "/" + dataFileMeta.fileName()), + dataFileMeta.externalPath() == null + ? BinaryString.fromString( + dataSplit.bucketPath() + + "/" + + dataFileMeta.fileName()) + : BinaryString.fromString(dataFileMeta.externalPath()), () -> BinaryString.fromString( DataFilePathFactory.formatIdentifier( From f769ada6dfabd8df508b5053f259e176827469db Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Tue, 24 Dec 2024 00:03:54 +0800 Subject: [PATCH 3/9] new writen files will be persisted with externalPath propertity --- .../org/apache/paimon/io/DataFileMeta.java | 32 ++++++++++++++++--- .../paimon/io/KeyValueDataFileWriter.java | 3 +- .../apache/paimon/io/RowDataFileWriter.java | 3 +- .../apache/paimon/migrate/FileMetaUtils.java | 9 ++++-- .../paimon/append/AppendOnlyWriterTest.java | 1 + .../paimon/io/DataFileTestDataGenerator.java | 2 +- .../paimon/io/KeyValueFileReadWriteTest.java | 7 ++++ .../apache/paimon/table/source/SplitTest.java | 1 + .../apache/paimon/spark/ScanHelperTest.scala | 2 ++ 9 files changed, 50 insertions(+), 10 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index e2e9fece304f..14670918252b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -135,7 +135,8 @@ public static DataFileMeta forAppend( List extraFiles, @Nullable byte[] embeddedIndex, @Nullable FileSource fileSource, - @Nullable List valueStatsCols) { + @Nullable List valueStatsCols, + @Nullable String externalPath) { return new DataFileMeta( fileName, fileSize, @@ -154,7 +155,7 @@ public static DataFileMeta forAppend( embeddedIndex, fileSource, valueStatsCols, - null); + externalPath); } public DataFileMeta( @@ -173,7 +174,8 @@ public DataFileMeta( @Nullable Long deleteRowCount, @Nullable byte[] embeddedIndex, @Nullable FileSource fileSource, - @Nullable List valueStatsCols) { + @Nullable List valueStatsCols, + @Nullable String externalPath) { this( fileName, fileSize, @@ -192,7 +194,7 @@ public DataFileMeta( embeddedIndex, fileSource, valueStatsCols, - null); + externalPath); } public DataFileMeta( @@ -498,6 +500,28 @@ public DataFileMeta copy(byte[] newEmbeddedIndex) { externalPath); } + public DataFileMeta copy(String externalPath) { + return new DataFileMeta( + fileName, + fileSize, + rowCount, + minKey, + maxKey, + keyStats, + valueStats, + minSequenceNumber, + maxSequenceNumber, + schemaId, + level, + extraFiles, + creationTime, + deleteRowCount, + embeddedIndex, + fileSource, + valueStatsCols, + externalPath); + } + @Override public boolean equals(Object o) { if (o == this) { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java index 651c6a6f7b56..d77f6b71e446 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java @@ -195,7 +195,8 @@ public DataFileMeta result() throws IOException { deleteRecordCount, indexResult.embeddedIndexBytes(), fileSource, - valueStatsPair.getKey()); + valueStatsPair.getKey(), + path.getParent().toString()); } abstract Pair fetchKeyValueStats(SimpleColStats[] rowStats); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java index 8c2e8ec9498c..ae5dfa07ad8a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java @@ -124,6 +124,7 @@ public DataFileMeta result() throws IOException { : Collections.singletonList(indexResult.independentIndexFile()), indexResult.embeddedIndexBytes(), fileSource, - statsPair.getKey()); + statsPair.getKey(), + path.getParent().toString()); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java index 391c5f9bb615..6beafb7dd7da 100644 --- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java @@ -125,7 +125,8 @@ private static DataFileMeta constructFileMeta( newPath, simpleStatsExtractor, fileIO, - table); + table, + newPath.getParent().toString()); } catch (IOException e) { throw new RuntimeException("error when construct file meta", e); } @@ -150,7 +151,8 @@ private static DataFileMeta constructFileMeta( Path path, SimpleStatsExtractor simpleStatsExtractor, FileIO fileIO, - Table table) + Table table, + String externalPath) throws IOException { SimpleStatsConverter statsArraySerializer = new SimpleStatsConverter(table.rowType()); @@ -169,7 +171,8 @@ private static DataFileMeta constructFileMeta( Collections.emptyList(), null, FileSource.APPEND, - null); + null, + externalPath); } public static BinaryRow writePartitionValue( diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index a9012ed89b34..51b7a0e6acd3 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -680,6 +680,7 @@ private DataFileMeta generateCompactAfter(List toCompact) throws I Collections.emptyList(), null, FileSource.APPEND, + null, null); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java index 7de4214beaa9..4a8e5c0ba2cf 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java @@ -176,7 +176,7 @@ public static class Data { public final DataFileMeta meta; public final List content; - private Data(BinaryRow partition, int bucket, DataFileMeta meta, List content) { + public Data(BinaryRow partition, int bucket, DataFileMeta meta, List content) { this.partition = partition; this.bucket = bucket; this.meta = meta; diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index e81756268980..20cdd25600a4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -54,6 +54,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Function; @@ -96,6 +97,7 @@ public void testWriteAndReadDataFileWithFileExtractingRollingFile() throws Excep private void testWriteAndReadDataFileImpl(String format) throws Exception { DataFileTestDataGenerator.Data data = gen.next(); KeyValueFileWriterFactory writerFactory = createWriterFactory(tempDir.toString(), format); + DataFileMetaSerializer serializer = new DataFileMetaSerializer(); RollingFileWriter writer = @@ -381,6 +383,11 @@ private void checkRollingFiles( for (DataFileMeta meta : actual) { assertThat(meta.level()).isEqualTo(expected.level()); } + + // assert actual externalPath is not null + for (DataFileMeta meta : actual) { + assertThat(Objects.requireNonNull(meta.externalPath())); + } } @ParameterizedTest diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java index a4e581b701ea..a088f40dab21 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitTest.java @@ -447,6 +447,7 @@ private DataFileMeta newDataFile(long rowCount) { Collections.emptyList(), null, null, + null, null); } diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala index a3223446f644..4997f65eaf0c 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala @@ -53,6 +53,7 @@ class ScanHelperTest extends PaimonSparkTestBase { new java.util.ArrayList[String](), null, FileSource.APPEND, + null, null) } @@ -89,6 +90,7 @@ class ScanHelperTest extends PaimonSparkTestBase { new java.util.ArrayList[String](), null, FileSource.APPEND, + null, null) ).asJava From 004da94188e62d2d027567d14ba7ae3441be2c85 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Tue, 24 Dec 2024 10:27:54 +0800 Subject: [PATCH 4/9] do not set externalPath when user not provide data-file.external-path --- .../apache/paimon/append/AppendOnlyWriter.java | 4 ++-- .../java/org/apache/paimon/io/DataFileMeta.java | 4 ++-- .../org/apache/paimon/io/DataFilePathFactory.java | 15 +++++++++++++++ .../org/apache/paimon/io/FileIndexEvaluator.java | 2 +- .../apache/paimon/io/KeyValueDataFileWriter.java | 5 ++++- .../paimon/io/KeyValueFileWriterFactory.java | 15 ++------------- .../org/apache/paimon/io/RowDataFileWriter.java | 7 ++++++- .../apache/paimon/mergetree/MergeTreeWriter.java | 8 ++++---- .../org/apache/paimon/migrate/FileMetaUtils.java | 9 ++++----- .../paimon/operation/FileStoreCommitImpl.java | 2 +- .../apache/paimon/operation/RawFileSplitRead.java | 5 +---- .../paimon/append/AppendOnlyWriterTest.java | 6 +++--- .../paimon/io/KeyValueFileReadWriteTest.java | 5 ----- .../paimon/mergetree/MergeTreeTestBase.java | 2 +- .../table/AppendOnlyFileStoreTableTest.java | 2 +- .../compact/changelog/ChangelogCompactTask.java | 5 +++-- .../paimon/flink/sink/RewriteFileIndexSink.java | 11 ++++++----- 17 files changed, 56 insertions(+), 51 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index e70410b5d371..4c313dd655c0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -244,7 +244,7 @@ public void close() throws Exception { for (DataFileMeta file : compactAfter) { // appendOnlyCompactManager will rewrite the file and no file upgrade will occur, so we // can directly delete the file in compactAfter. - fileIO.deleteQuietly(pathFactory.toPath(file.fileName(), file.externalPath())); + fileIO.deleteQuietly(pathFactory.toPath(file)); } sinkWriter.close(); @@ -271,7 +271,7 @@ public void toBufferedWriter() throws Exception { } finally { // remove small files for (DataFileMeta file : files) { - fileIO.deleteQuietly(pathFactory.toPath(file.fileName(), file.externalPath())); + fileIO.deleteQuietly(pathFactory.toPath(file)); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index 14670918252b..6e202cd1479e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -451,8 +451,8 @@ public DataFileMeta copyWithoutStats() { public List collectFiles(DataFilePathFactory pathFactory) { List paths = new ArrayList<>(); - paths.add(pathFactory.toPath(fileName, externalPath)); - extraFiles.forEach(f -> paths.add(pathFactory.toPath(f, externalPath))); + paths.add(pathFactory.toPath(this)); + extraFiles.forEach(f -> paths.add(pathFactory.toIndexPath(this, f))); return paths; } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java index c32f81eb1036..8c9d8a9d73e5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java @@ -78,6 +78,7 @@ private Path newPath(String prefix) { return new Path(parent, name); } + @VisibleForTesting public Path toPath(String fileName) { return new Path(parent + "/" + fileName); } @@ -96,6 +97,20 @@ public Path toPath(String fileName, String externalPath) { return new Path(externalPath + "/" + fileName); } + public Path toPath(DataFileMeta dataFileMeta) { + if (dataFileMeta.externalPath() == null) { + return new Path(parent + "/" + dataFileMeta.fileName()); + } + return new Path(dataFileMeta.externalPath() + "/" + dataFileMeta.fileName()); + } + + public Path toIndexPath(DataFileMeta dataFileMeta, String indexFileName) { + if (dataFileMeta.externalPath() == null) { + return new Path(parent + "/" + indexFileName); + } + return new Path(dataFileMeta.externalPath() + "/" + indexFileName); + } + @VisibleForTesting public String uuid() { return uuid; diff --git a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java index 530b87165322..9ca158a5d031 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java @@ -62,7 +62,7 @@ public static FileIndexResult evaluate( // go to file index check try (FileIndexPredicate predicate = new FileIndexPredicate( - dataFilePathFactory.toPath(indexFiles.get(0)), + dataFilePathFactory.toIndexPath(file, indexFiles.get(0)), fileIO, dataSchema.logicalRowType())) { return predicate.evaluate( diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java index d77f6b71e446..fd204561cef4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java @@ -76,6 +76,7 @@ public abstract class KeyValueDataFileWriter private long minSeqNumber = Long.MAX_VALUE; private long maxSeqNumber = Long.MIN_VALUE; private long deleteRecordCount = 0; + private final boolean isExternalPath; public KeyValueDataFileWriter( FileIO fileIO, @@ -116,6 +117,7 @@ public KeyValueDataFileWriter( this.dataFileIndexWriter = DataFileIndexWriter.create( fileIO, dataFileToFileIndexPath(path), valueType, fileIndexOptions); + this.isExternalPath = false; } @Override @@ -177,6 +179,7 @@ public DataFileMeta result() throws IOException { ? DataFileIndexWriter.EMPTY_RESULT : dataFileIndexWriter.result(); + String externalPath = isExternalPath ? path.getParent().toString() : null; return new DataFileMeta( path.getName(), fileIO.getFileSize(path), @@ -196,7 +199,7 @@ public DataFileMeta result() throws IOException { indexResult.embeddedIndexBytes(), fileSource, valueStatsPair.getKey(), - path.getParent().toString()); + externalPath); } abstract Pair fetchKeyValueStats(SimpleColStats[] rowStats); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index 45ea189f52f4..a498cc37e5c1 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -142,19 +142,8 @@ private KeyValueDataFileWriter createDataFileWriter( fileIndexOptions); } - public void deleteFile(String filename, int level) { - fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(filename)); - } - - public void deleteFile(String filename, int level, String externalPath) { - fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(filename, externalPath)); - } - - public void copyFile(String sourceFileName, String targetFileName, int level) - throws IOException { - Path sourcePath = formatContext.pathFactory(level).toPath(sourceFileName); - Path targetPath = formatContext.pathFactory(level).toPath(targetFileName); - fileIO.copyFile(sourcePath, targetPath, true); + public void deleteFile(DataFileMeta meta, int level) { + fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(meta)); } public void copyFile( diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java index ae5dfa07ad8a..814899b107ad 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java @@ -52,6 +52,7 @@ public class RowDataFileWriter extends StatsCollectingSingleFileWriter commitMessages) { toDelete.addAll(commitMessage.compactIncrement().changelogFiles()); for (DataFileMeta file : toDelete) { - fileIO.deleteQuietly(pathFactory.toPath(file.fileName(), file.externalPath())); + fileIO.deleteQuietly(pathFactory.toPath(file)); } } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java index 70452dd92ce0..d0f3275b5afc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/RawFileSplitRead.java @@ -210,10 +210,7 @@ private FileRecordReader createFileReader( FormatReaderContext formatReaderContext = new FormatReaderContext( - fileIO, - dataFilePathFactory.toPath(file.fileName(), file.externalPath()), - file.fileSize(), - fileIndexResult); + fileIO, dataFilePathFactory.toPath(file), file.fileSize(), fileIndexResult); FileRecordReader fileRecordReader = new DataFileRecordReader( formatReaderMapping.getReaderFactory(), diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index 51b7a0e6acd3..3f752be13e73 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -125,7 +125,7 @@ public void testSingleWrite() throws Exception { DataFileMeta meta = increment.newFilesIncrement().newFiles().get(0); assertThat(meta).isNotNull(); - Path path = pathFactory.toPath(meta.fileName()); + Path path = pathFactory.toPath(meta); assertThat(LocalFileIO.create().exists(path)).isTrue(); assertThat(meta.rowCount()).isEqualTo(1L); @@ -186,7 +186,7 @@ public void testMultipleCommits() throws Exception { assertThat(inc.newFilesIncrement().newFiles().size()).isEqualTo(1); DataFileMeta meta = inc.newFilesIncrement().newFiles().get(0); - Path path = pathFactory.toPath(meta.fileName()); + Path path = pathFactory.toPath(meta); assertThat(LocalFileIO.create().exists(path)).isTrue(); assertThat(meta.rowCount()).isEqualTo(100L); @@ -227,7 +227,7 @@ public void testRollingWrite() throws Exception { int id = 0; for (DataFileMeta meta : firstInc.newFilesIncrement().newFiles()) { - Path path = pathFactory.toPath(meta.fileName()); + Path path = pathFactory.toPath(meta); assertThat(LocalFileIO.create().exists(path)).isTrue(); assertThat(meta.rowCount()).isEqualTo(1000L); diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index 20cdd25600a4..bef3721f1c71 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -383,11 +383,6 @@ private void checkRollingFiles( for (DataFileMeta meta : actual) { assertThat(meta.level()).isEqualTo(expected.level()); } - - // assert actual externalPath is not null - for (DataFileMeta meta : actual) { - assertThat(Objects.requireNonNull(meta.externalPath())); - } } @ParameterizedTest diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index f2a9c44dd7ce..47d12ce47cf4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -592,7 +592,7 @@ private void mergeCompacted( assertThat(remove).isTrue(); // See MergeTreeWriter.updateCompactResult if (!newFileNames.contains(file.fileName()) && !afterFiles.contains(file.fileName())) { - compactWriterFactory.deleteFile(file.fileName(), file.level()); + compactWriterFactory.deleteFile(file, file.level()); } } compactedFiles.addAll(increment.compactIncrement().compactAfter()); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java index 01d4e89af95d..471b60d3cfa5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/AppendOnlyFileStoreTableTest.java @@ -112,7 +112,7 @@ public void testReadDeletedFiles() throws Exception { table.store() .pathFactory() .createDataFilePathFactory(split.partition(), split.bucket()) - .toPath(split.dataFiles().get(0).fileName()); + .toPath(split.dataFiles().get(0)); table.fileIO().deleteQuietly(path); // read diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java index 713dbdd16e16..0d50294ce9aa 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java @@ -96,7 +96,7 @@ public List doCompact(FileStoreTable table) throws Exception { outputStream, results, table, - dataFilePathFactory.toPath(meta.fileName(), meta.externalPath()), + dataFilePathFactory.toPath(meta), bucket, false, meta); @@ -111,7 +111,7 @@ public List doCompact(FileStoreTable table) throws Exception { outputStream, results, table, - dataFilePathFactory.toPath(meta.fileName(), meta.externalPath()), + dataFilePathFactory.toPath(meta), bucket, true, meta); @@ -168,6 +168,7 @@ private List produceNewCommittables( .rename( changelogTempPath, dataFilePathFactory.toPath( + baseResult.meta.externalPath(), realName + "." + CompactedChangelogReadOnlyFormat.getIdentifier( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java index d9f863c6b919..af6b7df33e7e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java @@ -198,16 +198,17 @@ public DataFileMeta process(BinaryRow partition, int bucket, DataFileMeta dataFi String indexFile = indexFiles.get(0); try (FileIndexFormat.Reader indexReader = FileIndexFormat.createReader( - fileIO.newInputStream(dataFilePathFactory.toPath(indexFile)), + fileIO.newInputStream( + dataFilePathFactory.toIndexPath(dataFileMeta, indexFile)), schemaInfo.fileSchema)) { maintainers = indexReader.readAll(); } - newIndexPath = createNewFileIndexFilePath(dataFilePathFactory.toPath(indexFile)); + newIndexPath = + createNewFileIndexFilePath( + dataFilePathFactory.toIndexPath(dataFileMeta, indexFile)); } else { maintainers = new HashMap<>(); - newIndexPath = - dataFileToFileIndexPath( - dataFilePathFactory.toPath(dataFileMeta.fileName())); + newIndexPath = dataFileToFileIndexPath(dataFilePathFactory.toPath(dataFileMeta)); } // remove unnecessary From e51d64bcf1eee34d729f55441e9ec1e927bfffe9 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Tue, 24 Dec 2024 10:40:31 +0800 Subject: [PATCH 5/9] remove useless codes --- .../org/apache/paimon/io/KeyValueFileReaderFactory.java | 2 ++ .../java/org/apache/paimon/migrate/FileMetaUtils.java | 1 - .../org/apache/paimon/table/query/LocalTableQuery.java | 8 +------- .../org/apache/paimon/io/KeyValueFileReadWriteTest.java | 2 -- .../flink/compact/changelog/ChangelogCompactTask.java | 4 ++-- 5 files changed, 5 insertions(+), 12 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java index af55ae2eb00b..9d65a5411364 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileReaderFactory.java @@ -20,6 +20,7 @@ import org.apache.paimon.CoreOptions; import org.apache.paimon.KeyValue; +import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; import org.apache.paimon.deletionvectors.ApplyDeletionVectorReader; @@ -105,6 +106,7 @@ public RecordReader createRecordReader(DataFileMeta file) throws IOExc file.externalPath()); } + @VisibleForTesting public RecordReader createRecordReader( long schemaId, String fileName, long fileSize, int level, String externalPath) throws IOException { diff --git a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java index 1d5811d389e0..51a0b5e2a92e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java @@ -118,7 +118,6 @@ private static DataFileMeta constructFileMeta( new RuntimeException( "Can't get table stats extractor for format " + format)); - Path newPath = renameFile(fileIO, fileStatus.getPath(), dir, format, rollback); return constructFileMeta( newPath.getName(), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java index 2bedb10cc8cd..d474d4e2d023 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java @@ -161,13 +161,7 @@ private void newLookupLevels(BinaryRow partition, int bucket, List readerFactoryBuilder.keyType(), new LookupLevels.KeyValueProcessor(readerFactoryBuilder.readValueType()), file -> { - RecordReader reader = - factory.createRecordReader( - file.schemaId(), - file.fileName(), - file.fileSize(), - file.level(), - file.externalPath()); + RecordReader reader = factory.createRecordReader(file); if (cacheRowFilter != null) { reader = reader.filter( diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index bef3721f1c71..e81756268980 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -54,7 +54,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import java.util.function.Function; @@ -97,7 +96,6 @@ public void testWriteAndReadDataFileWithFileExtractingRollingFile() throws Excep private void testWriteAndReadDataFileImpl(String format) throws Exception { DataFileTestDataGenerator.Data data = gen.next(); KeyValueFileWriterFactory writerFactory = createWriterFactory(tempDir.toString(), format); - DataFileMetaSerializer serializer = new DataFileMetaSerializer(); RollingFileWriter writer = diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java index 0d50294ce9aa..330488435904 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java @@ -168,11 +168,11 @@ private List produceNewCommittables( .rename( changelogTempPath, dataFilePathFactory.toPath( - baseResult.meta.externalPath(), realName + "." + CompactedChangelogReadOnlyFormat.getIdentifier( - baseResult.meta.fileFormat()))); + baseResult.meta.fileFormat()), + baseResult.meta.externalPath())); List newCommittables = new ArrayList<>(); From e309dd0e62b627c6e701b312a9e7b21815489dff Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Tue, 24 Dec 2024 10:46:19 +0800 Subject: [PATCH 6/9] remove useless codes --- .../org/apache/paimon/io/DataFileMeta.java | 22 ------------------- .../io/DataFileMeta10LegacySerializer.java | 2 +- .../paimon/io/DataFileTestDataGenerator.java | 2 +- 3 files changed, 2 insertions(+), 24 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index 6e202cd1479e..aaa18b5c65ce 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -500,28 +500,6 @@ public DataFileMeta copy(byte[] newEmbeddedIndex) { externalPath); } - public DataFileMeta copy(String externalPath) { - return new DataFileMeta( - fileName, - fileSize, - rowCount, - minKey, - maxKey, - keyStats, - valueStats, - minSequenceNumber, - maxSequenceNumber, - schemaId, - level, - extraFiles, - creationTime, - deleteRowCount, - embeddedIndex, - fileSource, - valueStatsCols, - externalPath); - } - @Override public boolean equals(Object o) { if (o == this) { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java index 68ccba6ea31c..518db7c65834 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta10LegacySerializer.java @@ -46,7 +46,7 @@ import static org.apache.paimon.utils.SerializationUtils.newStringType; import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow; -/** Serializer for {@link DataFileMeta} with 0.9 version. */ +/** Serializer for {@link DataFileMeta} with 1.0 snapshot version. */ public class DataFileMeta10LegacySerializer implements Serializable { private static final long serialVersionUID = 1L; diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java index 4a8e5c0ba2cf..7de4214beaa9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java @@ -176,7 +176,7 @@ public static class Data { public final DataFileMeta meta; public final List content; - public Data(BinaryRow partition, int bucket, DataFileMeta meta, List content) { + private Data(BinaryRow partition, int bucket, DataFileMeta meta, List content) { this.partition = partition; this.bucket = bucket; this.meta = meta; From 3860b0901df173d087e6330acb504adbe79583d2 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Tue, 24 Dec 2024 11:43:49 +0800 Subject: [PATCH 7/9] fix review comments --- .../org/apache/paimon/io/DataFileMeta.java | 6 +- .../apache/paimon/io/DataFilePathFactory.java | 6 +- .../apache/paimon/io/FileIndexEvaluator.java | 2 +- .../paimon/io/KeyValueDataFileWriter.java | 5 +- .../paimon/io/KeyValueFileWriterFactory.java | 7 +- .../apache/paimon/io/RowDataFileWriter.java | 6 +- .../org/apache/paimon/manifest/FileEntry.java | 17 +++-- .../apache/paimon/manifest/ManifestEntry.java | 3 +- .../paimon/manifest/SimpleFileEntry.java | 17 ++--- .../manifest/SimpleFileEntrySerializer.java | 69 ------------------- .../paimon/mergetree/MergeTreeWriter.java | 10 ++- .../changelog/ChangelogCompactTask.java | 10 +-- .../flink/sink/RewriteFileIndexSink.java | 5 +- 13 files changed, 47 insertions(+), 116 deletions(-) delete mode 100644 paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index aaa18b5c65ce..459cd788de53 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -405,7 +405,7 @@ public DataFileMeta upgrade(int newLevel) { externalPath); } - public DataFileMeta rename(String newFileName) { + public DataFileMeta rename(String newExternalPath, String newFileName) { return new DataFileMeta( newFileName, fileSize, @@ -424,7 +424,7 @@ public DataFileMeta rename(String newFileName) { embeddedIndex, fileSource, valueStatsCols, - externalPath); + newExternalPath); } public DataFileMeta copyWithoutStats() { @@ -452,7 +452,7 @@ public DataFileMeta copyWithoutStats() { public List collectFiles(DataFilePathFactory pathFactory) { List paths = new ArrayList<>(); paths.add(pathFactory.toPath(this)); - extraFiles.forEach(f -> paths.add(pathFactory.toIndexPath(this, f))); + extraFiles.forEach(f -> paths.add(pathFactory.toExtraFilePath(this, f))); return paths; } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java index 8c9d8a9d73e5..4c56ff56b3ea 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java @@ -104,11 +104,11 @@ public Path toPath(DataFileMeta dataFileMeta) { return new Path(dataFileMeta.externalPath() + "/" + dataFileMeta.fileName()); } - public Path toIndexPath(DataFileMeta dataFileMeta, String indexFileName) { + public Path toExtraFilePath(DataFileMeta dataFileMeta, String extraFile) { if (dataFileMeta.externalPath() == null) { - return new Path(parent + "/" + indexFileName); + return new Path(parent + "/" + extraFile); } - return new Path(dataFileMeta.externalPath() + "/" + indexFileName); + return new Path(dataFileMeta.externalPath() + "/" + extraFile); } @VisibleForTesting diff --git a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java index 9ca158a5d031..9055097d3718 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/FileIndexEvaluator.java @@ -62,7 +62,7 @@ public static FileIndexResult evaluate( // go to file index check try (FileIndexPredicate predicate = new FileIndexPredicate( - dataFilePathFactory.toIndexPath(file, indexFiles.get(0)), + dataFilePathFactory.toExtraFilePath(file, indexFiles.get(0)), fileIO, dataSchema.logicalRowType())) { return predicate.evaluate( diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java index fd204561cef4..f78d7556487f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java @@ -76,7 +76,6 @@ public abstract class KeyValueDataFileWriter private long minSeqNumber = Long.MAX_VALUE; private long maxSeqNumber = Long.MIN_VALUE; private long deleteRecordCount = 0; - private final boolean isExternalPath; public KeyValueDataFileWriter( FileIO fileIO, @@ -117,7 +116,6 @@ public KeyValueDataFileWriter( this.dataFileIndexWriter = DataFileIndexWriter.create( fileIO, dataFileToFileIndexPath(path), valueType, fileIndexOptions); - this.isExternalPath = false; } @Override @@ -179,7 +177,6 @@ public DataFileMeta result() throws IOException { ? DataFileIndexWriter.EMPTY_RESULT : dataFileIndexWriter.result(); - String externalPath = isExternalPath ? path.getParent().toString() : null; return new DataFileMeta( path.getName(), fileIO.getFileSize(path), @@ -199,7 +196,7 @@ public DataFileMeta result() throws IOException { indexResult.embeddedIndexBytes(), fileSource, valueStatsPair.getKey(), - externalPath); + null); } abstract Pair fetchKeyValueStats(SimpleColStats[] rowStats); diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index a498cc37e5c1..500320c24947 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -146,11 +146,10 @@ public void deleteFile(DataFileMeta meta, int level) { fileIO.deleteQuietly(formatContext.pathFactory(level).toPath(meta)); } - public void copyFile( - String sourceFileName, String targetFileName, int level, String externalPath) + public void copyFile(DataFileMeta sourceMeta, DataFileMeta targetMeta, int level) throws IOException { - Path sourcePath = formatContext.pathFactory(level).toPath(sourceFileName, externalPath); - Path targetPath = formatContext.pathFactory(level).toPath(targetFileName, externalPath); + Path sourcePath = formatContext.pathFactory(level).toPath(sourceMeta); + Path targetPath = formatContext.pathFactory(level).toPath(targetMeta); fileIO.copyFile(sourcePath, targetPath, true); } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java index 814899b107ad..6fdcf0e759e3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java @@ -52,7 +52,6 @@ public class RowDataFileWriter extends StatsCollectingSingleFileWriter extraFiles; @Nullable private final byte[] embeddedIndex; + @Nullable public final String externalPath; /* Cache the hash code for the string */ private Integer hash; @@ -85,13 +86,15 @@ public Identifier( int level, String fileName, List extraFiles, - @Nullable byte[] embeddedIndex) { + @Nullable byte[] embeddedIndex, + @Nullable String externalPath) { this.partition = partition; this.bucket = bucket; this.level = level; this.fileName = fileName; this.extraFiles = extraFiles; this.embeddedIndex = embeddedIndex; + this.externalPath = externalPath; } @Override @@ -108,7 +111,8 @@ public boolean equals(Object o) { && Objects.equals(partition, that.partition) && Objects.equals(fileName, that.fileName) && Objects.equals(extraFiles, that.extraFiles) - && Objects.deepEquals(embeddedIndex, that.embeddedIndex); + && Objects.deepEquals(embeddedIndex, that.embeddedIndex) + && Objects.deepEquals(externalPath, that.externalPath); } @Override @@ -121,7 +125,8 @@ public int hashCode() { level, fileName, extraFiles, - Arrays.hashCode(embeddedIndex)); + Arrays.hashCode(embeddedIndex), + externalPath); } return hash; } @@ -140,6 +145,8 @@ public String toString() { + extraFiles + ", embeddedIndex=" + Arrays.toString(embeddedIndex) + + ", externalPath=" + + externalPath + '}'; } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java index aec6b9063003..d4748451d8ca 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntry.java @@ -128,7 +128,8 @@ public Identifier identifier() { file.level(), file.fileName(), file.extraFiles(), - file.embeddedIndex()); + file.embeddedIndex(), + file.externalPath()); } public ManifestEntry copyWithoutStats() { diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java index 984f24036009..f86bded52d46 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntry.java @@ -38,7 +38,7 @@ public class SimpleFileEntry implements FileEntry { @Nullable private final byte[] embeddedIndex; private final BinaryRow minKey; private final BinaryRow maxKey; - private final String externalPath; + @Nullable private final String externalPath; public SimpleFileEntry( FileKind kind, @@ -50,7 +50,7 @@ public SimpleFileEntry( @Nullable byte[] embeddedIndex, BinaryRow minKey, BinaryRow maxKey, - String externalPath) { + @Nullable String externalPath) { this.kind = kind; this.partition = partition; this.bucket = bucket; @@ -106,9 +106,15 @@ public String fileName() { return fileName; } + @Override + public String externalPath() { + return externalPath; + } + @Override public Identifier identifier() { - return new Identifier(partition, bucket, level, fileName, extraFiles, embeddedIndex); + return new Identifier( + partition, bucket, level, fileName, extraFiles, embeddedIndex, externalPath); } @Override @@ -126,11 +132,6 @@ public List extraFiles() { return extraFiles; } - @Override - public String externalPath() { - return externalPath; - } - @Override public boolean equals(Object o) { if (this == o) { diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java b/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java deleted file mode 100644 index 50065a93fadd..000000000000 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/SimpleFileEntrySerializer.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.manifest; - -import org.apache.paimon.data.InternalRow; -import org.apache.paimon.io.DataFileMeta; -import org.apache.paimon.utils.VersionedObjectSerializer; - -import static org.apache.paimon.utils.InternalRowUtils.fromStringArrayData; -import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; - -/** A {@link VersionedObjectSerializer} for {@link SimpleFileEntry}, only supports reading. */ -public class SimpleFileEntrySerializer extends VersionedObjectSerializer { - - private static final long serialVersionUID = 1L; - - private final int version; - - public SimpleFileEntrySerializer() { - super(ManifestEntry.SCHEMA); - this.version = new ManifestEntrySerializer().getVersion(); - } - - @Override - public int getVersion() { - return version; - } - - @Override - public InternalRow convertTo(SimpleFileEntry meta) { - throw new UnsupportedOperationException("Only supports convert from row."); - } - - @Override - public SimpleFileEntry convertFrom(int version, InternalRow row) { - if (this.version != version) { - throw new IllegalArgumentException("Unsupported version: " + version); - } - - InternalRow file = row.getRow(4, DataFileMeta.SCHEMA.getFieldCount()); - return new SimpleFileEntry( - FileKind.fromByteValue(row.getByte(0)), - deserializeBinaryRow(row.getBinary(1)), - row.getInt(2), - file.getInt(10), - file.getString(0).toString(), - fromStringArrayData(file.getArray(11)), - file.isNullAt(14) ? null : file.getBinary(14), - deserializeBinaryRow(file.getBinary(3)), - deserializeBinaryRow(file.getBinary(4)), - file.getString(17).toString()); - } -} diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index 707d08dad259..df4855922360 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -27,6 +27,7 @@ import org.apache.paimon.compression.CompressOptions; import org.apache.paimon.data.InternalRow; import org.apache.paimon.disk.IOManager; +import org.apache.paimon.fs.Path; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; @@ -241,13 +242,10 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul } else if (changelogProducer == ChangelogProducer.INPUT && isInsertOnly) { List changelogMetas = new ArrayList<>(); for (DataFileMeta dataMeta : dataMetas) { + Path newPath = writerFactory.newChangelogPath(0); DataFileMeta changelogMeta = - dataMeta.rename(writerFactory.newChangelogPath(0).getName()); - writerFactory.copyFile( - dataMeta.fileName(), - changelogMeta.fileName(), - 0, - dataMeta.externalPath()); + dataMeta.rename(newPath.getParent().getName(), newPath.getName()); + writerFactory.copyFile(dataMeta, changelogMeta, 0); changelogMetas.add(changelogMeta); } newFilesChangelog.addAll(changelogMetas); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java index 330488435904..39860e418c18 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/ChangelogCompactTask.java @@ -167,12 +167,12 @@ private List produceNewCommittables( table.fileIO() .rename( changelogTempPath, - dataFilePathFactory.toPath( + dataFilePathFactory.toExtraFilePath( + baseResult.meta, realName + "." + CompactedChangelogReadOnlyFormat.getIdentifier( - baseResult.meta.fileFormat()), - baseResult.meta.externalPath())); + baseResult.meta.fileFormat()))); List newCommittables = new ArrayList<>(); @@ -194,9 +194,9 @@ private List produceNewCommittables( + CompactedChangelogReadOnlyFormat.getIdentifier( result.meta.fileFormat()); if (result.isCompactResult) { - compactChangelog.add(result.meta.rename(name)); + compactChangelog.add(result.meta.rename(baseResult.meta.externalPath(), name)); } else { - newFilesChangelog.add(result.meta.rename(name)); + newFilesChangelog.add(result.meta.rename(baseResult.meta.externalPath(), name)); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java index af6b7df33e7e..d35cb09cb78a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RewriteFileIndexSink.java @@ -199,13 +199,14 @@ public DataFileMeta process(BinaryRow partition, int bucket, DataFileMeta dataFi try (FileIndexFormat.Reader indexReader = FileIndexFormat.createReader( fileIO.newInputStream( - dataFilePathFactory.toIndexPath(dataFileMeta, indexFile)), + dataFilePathFactory.toExtraFilePath( + dataFileMeta, indexFile)), schemaInfo.fileSchema)) { maintainers = indexReader.readAll(); } newIndexPath = createNewFileIndexFilePath( - dataFilePathFactory.toIndexPath(dataFileMeta, indexFile)); + dataFilePathFactory.toExtraFilePath(dataFileMeta, indexFile)); } else { maintainers = new HashMap<>(); newIndexPath = dataFileToFileIndexPath(dataFilePathFactory.toPath(dataFileMeta)); From df8efa30c40c98b361c5b6d0a3af290c0a5b42d6 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Tue, 24 Dec 2024 11:51:21 +0800 Subject: [PATCH 8/9] fix review comments --- .../main/java/org/apache/paimon/io/RowDataFileWriter.java | 1 - .../main/java/org/apache/paimon/table/source/DataSplit.java | 2 ++ .../main/java/org/apache/paimon/table/system/FilesTable.java | 5 ++++- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java index 6fdcf0e759e3..cd46d67e3b60 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java @@ -111,7 +111,6 @@ public DataFileMeta result() throws IOException { dataFileIndexWriter == null ? DataFileIndexWriter.EMPTY_RESULT : dataFileIndexWriter.result(); - return DataFileMeta.forAppend( path.getName(), fileIO.getFileSize(path), diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index 56a420ff9845..87fa58a16453 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -183,6 +183,8 @@ private RawFile makeRawTableFile(String bucketPath, DataFileMeta file) { String path = file.externalPath(); if (path == null) { path = bucketPath + "/" + file.fileName(); + } else { + path = path + "/" + file.fileName(); } return new RawFile( path, diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java index d8d5d91db8a9..3107ebe150e3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FilesTable.java @@ -390,7 +390,10 @@ private LazyGenericRow toRow( dataSplit.bucketPath() + "/" + dataFileMeta.fileName()) - : BinaryString.fromString(dataFileMeta.externalPath()), + : BinaryString.fromString( + dataFileMeta.externalPath() + + "/" + + dataFileMeta.fileName()), () -> BinaryString.fromString( DataFilePathFactory.formatIdentifier( From a9f0fd4ea0f7a0132964578f90ab68210e268e65 Mon Sep 17 00:00:00 2001 From: HouliangQi Date: Tue, 24 Dec 2024 12:58:12 +0800 Subject: [PATCH 9/9] fix review comments --- .../apache/paimon/io/DataFilePathFactory.java | 18 ++++++------------ .../apache/paimon/table/source/DataSplit.java | 8 ++------ 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java index 4c56ff56b3ea..daeb9f52eada 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFilePathFactory.java @@ -91,24 +91,18 @@ public Path toPath(String fileName) { * @return the file's path */ public Path toPath(String fileName, String externalPath) { - if (externalPath == null) { - return new Path(parent + "/" + fileName); - } - return new Path(externalPath + "/" + fileName); + return new Path((externalPath == null ? parent : externalPath) + "/" + fileName); } public Path toPath(DataFileMeta dataFileMeta) { - if (dataFileMeta.externalPath() == null) { - return new Path(parent + "/" + dataFileMeta.fileName()); - } - return new Path(dataFileMeta.externalPath() + "/" + dataFileMeta.fileName()); + String externalPath = dataFileMeta.externalPath(); + String fileName = dataFileMeta.fileName(); + return new Path((externalPath == null ? parent : externalPath) + "/" + fileName); } public Path toExtraFilePath(DataFileMeta dataFileMeta, String extraFile) { - if (dataFileMeta.externalPath() == null) { - return new Path(parent + "/" + extraFile); - } - return new Path(dataFileMeta.externalPath() + "/" + extraFile); + String externalPath = dataFileMeta.externalPath(); + return new Path((externalPath == null ? parent : externalPath) + "/" + extraFile); } @VisibleForTesting diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java index 87fa58a16453..9178d25a912e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/DataSplit.java @@ -180,12 +180,8 @@ public Optional> convertToRawFiles() { } private RawFile makeRawTableFile(String bucketPath, DataFileMeta file) { - String path = file.externalPath(); - if (path == null) { - path = bucketPath + "/" + file.fileName(); - } else { - path = path + "/" + file.fileName(); - } + String path = file.externalPath() != null ? file.externalPath() : bucketPath; + path += "/" + file.fileName(); return new RawFile( path, file.fileSize(),