From 10fdaa64620c687992df4c4f9740a4b14dfc0e42 Mon Sep 17 00:00:00 2001 From: xiayuxiao Date: Fri, 29 Aug 2025 10:19:59 +0800 Subject: [PATCH 1/4] [core] Fix checkpoint recovery failure for compacted changelog files --- .../paimon/table/sink/TableCommitImpl.java | 12 +- .../utils/CompactedChangelogPathResolver.java | 67 +++++++ .../paimon/table/sink/TableCommitTest.java | 178 ++++++++++++++++++ .../CompactedChangelogPathResolverTest.java | 96 ++++++++++ 4 files changed, 352 insertions(+), 1 deletion(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/utils/CompactedChangelogPathResolverTest.java diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index c1a69a239163..95b2df34f513 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -33,6 +33,7 @@ import org.apache.paimon.tag.TagAutoCreation; import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.tag.TagTimeExpire; +import org.apache.paimon.utils.CompactedChangelogPathResolver; import org.apache.paimon.utils.DataFilePathFactories; import org.apache.paimon.utils.ExecutorThreadFactory; import org.apache.paimon.utils.IndexFilePathFactories; @@ -300,6 +301,15 @@ private void checkFilesExistence(List committables) { } } + // Resolve compacted changelog files to their real file paths + List resolvedFiles = new ArrayList<>(); + for (Path file : files) { + resolvedFiles.add(CompactedChangelogPathResolver.resolveCompactedChangelogPath(file)); + } + // Deduplicate paths as multiple compacted changelog references may resolve to the same + // physical file + resolvedFiles = resolvedFiles.stream().distinct().collect(Collectors.toList()); + Predicate nonExists = p -> { try { @@ -314,7 +324,7 @@ private void checkFilesExistence(List committables) { randomlyExecuteSequentialReturn( getExecutorService(null), f -> nonExists.test(f) ? singletonList(f) : emptyList(), - files)); + resolvedFiles)); if (!nonExistFiles.isEmpty()) { String message = diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java b/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java new file mode 100644 index 000000000000..d65c9b070a53 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.fs.Path; + +/** + * Utility class for resolving compacted changelog file paths. + * + *

This class provides functionality to resolve fake compacted changelog file paths to their real + * file paths, following the same protocol as CompactedChangelogFormatReaderFactory. + */ +public class CompactedChangelogPathResolver { + + /** + * Resolves compacted changelog file path to its real file path. Handles both real and fake + * compacted changelog file names as described in CompactedChangelogFormatReaderFactory. + * + * @param path the file path to resolve + * @return the resolved real file path + */ + public static Path resolveCompactedChangelogPath(Path path) { + // Check if this is a compacted changelog file by its .cc- extension + String[] nameAndFormat = path.getName().split("\\."); + if (nameAndFormat.length < 2 || !nameAndFormat[1].startsWith("cc-")) { + return path; + } + + String[] names = nameAndFormat[0].split("\\$"); + String[] split = names[1].split("-"); + if (split.length == 2) { + // Real file: compacted-changelog-xxx$bid-len.cc-format + return path; + } else { + // Fake file: compacted-changelog-xxx$bid-len-off-len2.cc-format + // Resolve to real file: bucket-bid/compacted-changelog-xxx$bid-len.cc-format + return new Path( + path.getParent().getParent(), + "bucket-" + + split[0] + + "/" + + names[0] + + "$" + + split[0] + + "-" + + split[1] + + "." + + nameAndFormat[1]); + } + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java index f8cb17bd91c8..0a39ca7ef8b4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java @@ -24,7 +24,10 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.io.DataIncrement; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestEntry; @@ -34,6 +37,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.SchemaUtils; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; @@ -438,4 +442,178 @@ public void testExpireForEmptyCommit() throws Exception { assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(5); assertThat(snapshotManager.latestSnapshotId()).isEqualTo(6); } + + @Test + public void testRecoverCompactedChangelogFiles() throws Exception { + String path = tempDir.toString(); + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.BIGINT()}, + new String[] {"k", "v"}); + + Options options = new Options(); + options.set(CoreOptions.PATH, path); + options.set(CoreOptions.BUCKET, 3); + TableSchema tableSchema = + SchemaUtils.forceCommit( + new SchemaManager(LocalFileIO.create(), new Path(path)), + new Schema( + rowType.getFields(), + Collections.emptyList(), + Collections.singletonList("k"), + options.toMap(), + "")); + + FileStoreTable table = + FileStoreTableFactory.create( + LocalFileIO.create(), + new Path(path), + tableSchema, + CatalogEnvironment.empty()); + + // Create fake compacted changelog files that should resolve to real files + String realChangelogFile = + "compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253.cc-parquet"; + String fakeChangelogFile1 = + "compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253-39253-35699.cc-parquet"; + String fakeChangelogFile2 = + "compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253-74952-37725.cc-parquet"; + + // Create directory structure + Path bucket0Dir = new Path(path, "bucket-0"); + Path bucket1Dir = new Path(path, "bucket-1"); + Path bucket2Dir = new Path(path, "bucket-2"); + LocalFileIO.create().mkdirs(bucket0Dir); + LocalFileIO.create().mkdirs(bucket1Dir); + LocalFileIO.create().mkdirs(bucket2Dir); + + // Create the real compacted changelog file + Path realFilePath = new Path(bucket0Dir, realChangelogFile); + LocalFileIO.create().newOutputStream(realFilePath, false).close(); + + DataFileMeta realFileMeta = + DataFileMeta.forAppend( + realChangelogFile, + 3000L, + 300L, + SimpleStats.EMPTY_STATS, + 0L, + 0L, + 1L, + Collections.emptyList(), + null, + null, + null, + null, + null, + null); + + // Create fake DataFileMeta for compacted changelog files + DataFileMeta fakeFileMeta1 = + DataFileMeta.forAppend( + fakeChangelogFile1, + 1000L, + 100L, + SimpleStats.EMPTY_STATS, + 0L, + 0L, + 1L, + Collections.emptyList(), + null, + null, + null, + null, + null, + null); + + DataFileMeta fakeFileMeta2 = + DataFileMeta.forAppend( + fakeChangelogFile2, + 2000L, + 200L, + SimpleStats.EMPTY_STATS, + 0L, + 0L, + 1L, + Collections.emptyList(), + null, + null, + null, + null, + null, + null); + + // Create commit message with fake compacted changelog files + BinaryRow partition = BinaryRow.EMPTY_ROW; + CommitMessageImpl commitMessage0 = + new CommitMessageImpl( + partition, + 0, + 3, + new DataIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(realFileMeta)), + new CompactIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList())); + CommitMessageImpl commitMessage1 = + new CommitMessageImpl( + partition, + 1, + 3, + new DataIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(fakeFileMeta1)), + new CompactIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList())); + CommitMessageImpl commitMessage2 = + new CommitMessageImpl( + partition, + 2, + 3, + new DataIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(fakeFileMeta2)), + new CompactIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList())); + + ManifestCommittable committable = new ManifestCommittable(1L); + committable.addFileCommittable(commitMessage0); + committable.addFileCommittable(commitMessage1); + committable.addFileCommittable(commitMessage2); + + String commitUser = UUID.randomUUID().toString(); + try (TableCommitImpl commit = table.newCommit(commitUser)) { + // This should succeed because fake files resolve to the existing real file + commit.filterAndCommitMultiple(Collections.singletonList(committable), false); + } + + // Now delete the real file and test that the check fails + LocalFileIO.create().delete(realFilePath, false); + + // Create a new committable with a larger identifier to simulate recovery from checkpoint + // This identifier must be larger than the previously committed identifier (1L) + ManifestCommittable newCommittable = new ManifestCommittable(2L); + newCommittable.addFileCommittable(commitMessage0); + newCommittable.addFileCommittable(commitMessage1); + newCommittable.addFileCommittable(commitMessage2); + + try (TableCommitImpl commit = table.newCommit(commitUser)) { + assertThatThrownBy( + () -> + commit.filterAndCommitMultiple( + Collections.singletonList(newCommittable), false)) + .hasMessageContaining( + "Cannot recover from this checkpoint because some files in the" + + " snapshot that need to be resubmitted have been deleted"); + } + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/CompactedChangelogPathResolverTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/CompactedChangelogPathResolverTest.java new file mode 100644 index 000000000000..595d66428172 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/CompactedChangelogPathResolverTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.fs.Path; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link CompactedChangelogPathResolver}. */ +public class CompactedChangelogPathResolverTest { + + @Test + public void testResolveNonCompactedChangelogFile() { + // Test regular changelog file - should return unchanged + Path regularFile = + new Path( + "/path/to/table/bucket-0/changelog-25b05ab0-6f90-4865-a984-8d9629bac735-1426.parquet"); + Path resolved = CompactedChangelogPathResolver.resolveCompactedChangelogPath(regularFile); + assertThat(resolved).isEqualTo(regularFile); + } + + @Test + public void testResolveRealCompactedChangelogFile() { + // Test real compacted changelog file - should return unchanged + Path realFile = + new Path( + "/path/to/table/bucket-0/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253.cc-parquet"); + Path resolved = CompactedChangelogPathResolver.resolveCompactedChangelogPath(realFile); + assertThat(resolved).isEqualTo(realFile); + } + + @Test + public void testResolveFakeCompactedChangelogFile() { + // Test fake compacted changelog file - should resolve to real path + Path fakeFile = + new Path( + "/path/to/table/bucket-1/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253-39253-35699.cc-parquet"); + Path resolved = CompactedChangelogPathResolver.resolveCompactedChangelogPath(fakeFile); + + Path expectedRealFile = + new Path( + "/path/to/table/bucket-0/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253.cc-parquet"); + assertThat(resolved).isEqualTo(expectedRealFile); + } + + @Test + public void testResolveWithDifferentFormats() { + // Test with different file formats + Path fakeOrcFile = + new Path( + "/path/to/table/bucket-2/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-1024-1024-512.cc-orc"); + Path resolvedOrc = + CompactedChangelogPathResolver.resolveCompactedChangelogPath(fakeOrcFile); + Path expectedOrcFile = + new Path( + "/path/to/table/bucket-0/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-1024.cc-orc"); + assertThat(resolvedOrc).isEqualTo(expectedOrcFile); + + Path fakeAvroFile = + new Path( + "/path/to/table/bucket-5/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$2-2048-2048-1024.cc-avro"); + Path resolvedAvro = + CompactedChangelogPathResolver.resolveCompactedChangelogPath(fakeAvroFile); + Path expectedAvroFile = + new Path( + "/path/to/table/bucket-2/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$2-2048.cc-avro"); + assertThat(resolvedAvro).isEqualTo(expectedAvroFile); + } + + @Test + public void testResolveFileWithoutExtension() { + // Test file without file extension - should return unchanged + Path fileWithoutExt = new Path("/path/to/table/file"); + Path resolved = + CompactedChangelogPathResolver.resolveCompactedChangelogPath(fileWithoutExt); + assertThat(resolved).isEqualTo(fileWithoutExt); + } +} From 4964ddb71a4a40e543ac08a2bd42ae1350f7dfc4 Mon Sep 17 00:00:00 2001 From: xiayuxiao Date: Fri, 5 Sep 2025 11:48:30 +0800 Subject: [PATCH 2/4] [core] Fix checkpoint recovery failure for compacted changelog files - fix 1 --- .../utils/CompactedChangelogPathResolver.java | 108 ++++++++++++++---- .../CompactedChangelogPathResolverTest.java | 21 ++++ ...CompactedChangelogFormatReaderFactory.java | 73 +++--------- 3 files changed, 120 insertions(+), 82 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java b/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java index d65c9b070a53..92eb23f279ae 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java @@ -24,44 +24,106 @@ * Utility class for resolving compacted changelog file paths. * *

This class provides functionality to resolve fake compacted changelog file paths to their real - * file paths, following the same protocol as CompactedChangelogFormatReaderFactory. + * file paths. + * + *

File Name Protocol + * + *

There are two kinds of file name. In the following description, bid1 and + * bid2 are bucket id, off is offset, len1 and len2 + * are lengths. + * + *

    + *
  • bucket-bid1/compacted-changelog-xxx$bid1-len1: This is the real file name. If + * this file name is recorded in manifest file meta, reader should read the bytes of this file + * starting from offset 0 with length len1. + *
  • bucket-bid2/compacted-changelog-xxx$bid1-len1-off-len2: This is the fake file + * name. Reader should read the bytes of file + * bucket-bid1/compacted-changelog-xxx$bid1-len1 starting from offset off + * with length len2. + *
*/ public class CompactedChangelogPathResolver { /** - * Resolves compacted changelog file path to its real file path. Handles both real and fake - * compacted changelog file names as described in CompactedChangelogFormatReaderFactory. + * Checks if the given path is a compacted changelog file path. + * + * @param path the file path to check + * @return true if the path is a compacted changelog file, false otherwise + */ + public static boolean isCompactedChangelogPath(Path path) { + return path.getName().startsWith("compacted-changelog-"); + } + + /** + * Resolves a file path, handling compacted changelog file path resolution if applicable. + * + *

For compacted changelog files, resolves fake file paths to their real file paths + * as described in the protocol above. For non-compacted changelog files, returns the + * path unchanged. * * @param path the file path to resolve - * @return the resolved real file path + * @return the resolved real file path for compacted changelog files, or the original path unchanged for other files */ public static Path resolveCompactedChangelogPath(Path path) { - // Check if this is a compacted changelog file by its .cc- extension - String[] nameAndFormat = path.getName().split("\\."); - if (nameAndFormat.length < 2 || !nameAndFormat[1].startsWith("cc-")) { + if (!isCompactedChangelogPath(path)) { return path; } + return decodePath(path).getPath(); + } + /** + * Decodes a compacted changelog file path to extract the real path, offset, and length. + * + * @param path the file path to decode + * @return the decode result containing real path, offset, and length + */ + public static DecodeResult decodePath(Path path) { + String[] nameAndFormat = path.getName().split("\\."); String[] names = nameAndFormat[0].split("\\$"); String[] split = names[1].split("-"); if (split.length == 2) { - // Real file: compacted-changelog-xxx$bid-len.cc-format - return path; + return new DecodeResult(path, 0, Long.parseLong(split[1])); } else { - // Fake file: compacted-changelog-xxx$bid-len-off-len2.cc-format - // Resolve to real file: bucket-bid/compacted-changelog-xxx$bid-len.cc-format - return new Path( - path.getParent().getParent(), - "bucket-" - + split[0] - + "/" - + names[0] - + "$" - + split[0] - + "-" - + split[1] - + "." - + nameAndFormat[1]); + Path realPath = + new Path( + path.getParent().getParent(), + "bucket-" + + split[0] + + "/" + + names[0] + + "$" + + split[0] + + "-" + + split[1] + + "." + + nameAndFormat[1]); + return new DecodeResult(realPath, Long.parseLong(split[2]), Long.parseLong(split[3])); + } + } + + /** Result of decoding a compacted changelog file path. */ + public static class DecodeResult { + + private final Path path; + private final long offset; + private final long length; + + public DecodeResult(Path path, long offset, long length) { + this.path = path; + this.offset = offset; + this.length = length; + } + + public Path getPath() { + return path; + } + + public long getOffset() { + return offset; + } + + public long getLength() { + return length; } } } diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/CompactedChangelogPathResolverTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/CompactedChangelogPathResolverTest.java index 595d66428172..737fb17eb0b9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/CompactedChangelogPathResolverTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/CompactedChangelogPathResolverTest.java @@ -27,6 +27,27 @@ /** Tests for {@link CompactedChangelogPathResolver}. */ public class CompactedChangelogPathResolverTest { + @Test + public void testIsCompactedChangelogPath() { + // Test non-compacted changelog file + Path regularFile = + new Path( + "/path/to/table/bucket-0/changelog-25b05ab0-6f90-4865-a984-8d9629bac735-1426.parquet"); + assertThat(CompactedChangelogPathResolver.isCompactedChangelogPath(regularFile)) + .isFalse(); + + // Test compacted changelog file + Path compactedFile = + new Path( + "/path/to/table/bucket-0/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253.cc-parquet"); + assertThat(CompactedChangelogPathResolver.isCompactedChangelogPath(compactedFile)) + .isTrue(); + + // Test regular data file + Path dataFile = new Path("/path/to/table/bucket-0/data-file-1.parquet"); + assertThat(CompactedChangelogPathResolver.isCompactedChangelogPath(dataFile)).isFalse(); + } + @Test public void testResolveNonCompactedChangelogFile() { // Test regular changelog file - should return unchanged diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java index cd4a1d8c3d87..70d2555cf3a2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java @@ -27,6 +27,7 @@ import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.fs.SeekableInputStream; import org.apache.paimon.reader.FileRecordReader; +import org.apache.paimon.utils.CompactedChangelogPathResolver; import org.apache.paimon.utils.RoaringBitmap32; import java.io.EOFException; @@ -35,21 +36,8 @@ /** * {@link FormatReaderFactory} for compacted changelog. * - *

File Name Protocol - * - *

There are two kinds of file name. In the following description, bid1 and - * bid2 are bucket id, off is offset, len1 and len2 - * are lengths. - * - *

    - *
  • bucket-bid1/compacted-changelog-xxx$bid1-len1: This is the real file name. If - * this file name is recorded in manifest file meta, reader should read the bytes of this file - * starting from offset 0 with length len1. - *
  • bucket-bid2/compacted-changelog-xxx$bid1-len1-off-len2: This is the fake file - * name. Reader should read the bytes of file - * bucket-bid1/compacted-changelog-xxx$bid1-len1 starting from offset off - * with length len2. - *
+ *

Uses {@link org.apache.paimon.utils.CompactedChangelogPathResolver} for file name protocol + * handling. */ public class CompactedChangelogFormatReaderFactory implements FormatReaderFactory { @@ -62,7 +50,7 @@ public CompactedChangelogFormatReaderFactory(FormatReaderFactory wrapped) { @Override public FileRecordReader createReader(Context context) throws IOException { OffsetReadOnlyFileIO fileIO = new OffsetReadOnlyFileIO(context.fileIO()); - long length = decodePath(context.filePath()).length; + long length = CompactedChangelogPathResolver.decodePath(context.filePath()).getLength(); return wrapped.createReader( new Context() { @@ -89,43 +77,6 @@ public RoaringBitmap32 selection() { }); } - private static DecodeResult decodePath(Path path) { - String[] nameAndFormat = path.getName().split("\\."); - String[] names = nameAndFormat[0].split("\\$"); - String[] split = names[1].split("-"); - if (split.length == 2) { - return new DecodeResult(path, 0, Long.parseLong(split[1])); - } else { - Path realPath = - new Path( - path.getParent().getParent(), - "bucket-" - + split[0] - + "/" - + names[0] - + "$" - + split[0] - + "-" - + split[1] - + "." - + nameAndFormat[1]); - return new DecodeResult(realPath, Long.parseLong(split[2]), Long.parseLong(split[3])); - } - } - - private static class DecodeResult { - - private final Path path; - private final long offset; - private final long length; - - private DecodeResult(Path path, long offset, long length) { - this.path = path; - this.offset = offset; - this.length = length; - } - } - private static class OffsetReadOnlyFileIO implements FileIO { private final FileIO wrapped; @@ -146,9 +97,12 @@ public void configure(CatalogContext context) { @Override public SeekableInputStream newInputStream(Path path) throws IOException { - DecodeResult result = decodePath(path); + CompactedChangelogPathResolver.DecodeResult result = + CompactedChangelogPathResolver.decodePath(path); return new OffsetSeekableInputStream( - wrapped.newInputStream(result.path), result.offset, result.length); + wrapped.newInputStream(result.getPath()), + result.getOffset(), + result.getLength()); } @Override @@ -159,14 +113,15 @@ public PositionOutputStream newOutputStream(Path path, boolean overwrite) @Override public FileStatus getFileStatus(Path path) throws IOException { - DecodeResult result = decodePath(path); - FileStatus status = wrapped.getFileStatus(result.path); + CompactedChangelogPathResolver.DecodeResult result = + CompactedChangelogPathResolver.decodePath(path); + FileStatus status = wrapped.getFileStatus(result.getPath()); return new FileStatus() { @Override public long getLen() { - return result.length; + return result.getLength(); } @Override @@ -193,7 +148,7 @@ public FileStatus[] listStatus(Path path) throws IOException { @Override public boolean exists(Path path) throws IOException { - return wrapped.exists(decodePath(path).path); + return wrapped.exists(CompactedChangelogPathResolver.decodePath(path).getPath()); } @Override From bda8e14a4f89312e1aff2a0634c069abde5a0e39 Mon Sep 17 00:00:00 2001 From: xiayuxiao Date: Fri, 5 Sep 2025 12:27:27 +0800 Subject: [PATCH 3/4] [core] Fix checkpoint recovery failure for compacted changelog files - fix 2 --- .../org/apache/paimon/utils/CompactedChangelogPathResolver.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java b/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java index 92eb23f279ae..a61682a6b08c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java @@ -56,7 +56,7 @@ public static boolean isCompactedChangelogPath(Path path) { /** * Resolves a file path, handling compacted changelog file path resolution if applicable. - * + * *

For compacted changelog files, resolves fake file paths to their real file paths * as described in the protocol above. For non-compacted changelog files, returns the * path unchanged. From bf8afcde4763d66722ab122a918d4a57721796f4 Mon Sep 17 00:00:00 2001 From: xiayuxiao Date: Fri, 5 Sep 2025 12:32:59 +0800 Subject: [PATCH 4/4] [core] Fix checkpoint recovery failure for compacted changelog files - fix 3 --- .../paimon/utils/CompactedChangelogPathResolver.java | 9 +++++---- .../paimon/utils/CompactedChangelogPathResolverTest.java | 6 ++---- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java b/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java index a61682a6b08c..5263497ecc82 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java @@ -57,12 +57,13 @@ public static boolean isCompactedChangelogPath(Path path) { /** * Resolves a file path, handling compacted changelog file path resolution if applicable. * - *

For compacted changelog files, resolves fake file paths to their real file paths - * as described in the protocol above. For non-compacted changelog files, returns the - * path unchanged. + *

For compacted changelog files, resolves fake file paths to their real file paths as + * described in the protocol above. For non-compacted changelog files, returns the path + * unchanged. * * @param path the file path to resolve - * @return the resolved real file path for compacted changelog files, or the original path unchanged for other files + * @return the resolved real file path for compacted changelog files, or the original path + * unchanged for other files */ public static Path resolveCompactedChangelogPath(Path path) { if (!isCompactedChangelogPath(path)) { diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/CompactedChangelogPathResolverTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/CompactedChangelogPathResolverTest.java index 737fb17eb0b9..f2b7941096a3 100644 --- a/paimon-core/src/test/java/org/apache/paimon/utils/CompactedChangelogPathResolverTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/utils/CompactedChangelogPathResolverTest.java @@ -33,15 +33,13 @@ public void testIsCompactedChangelogPath() { Path regularFile = new Path( "/path/to/table/bucket-0/changelog-25b05ab0-6f90-4865-a984-8d9629bac735-1426.parquet"); - assertThat(CompactedChangelogPathResolver.isCompactedChangelogPath(regularFile)) - .isFalse(); + assertThat(CompactedChangelogPathResolver.isCompactedChangelogPath(regularFile)).isFalse(); // Test compacted changelog file Path compactedFile = new Path( "/path/to/table/bucket-0/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253.cc-parquet"); - assertThat(CompactedChangelogPathResolver.isCompactedChangelogPath(compactedFile)) - .isTrue(); + assertThat(CompactedChangelogPathResolver.isCompactedChangelogPath(compactedFile)).isTrue(); // Test regular data file Path dataFile = new Path("/path/to/table/bucket-0/data-file-1.parquet");