diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java index c1a69a239163..95b2df34f513 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java @@ -33,6 +33,7 @@ import org.apache.paimon.tag.TagAutoCreation; import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.tag.TagTimeExpire; +import org.apache.paimon.utils.CompactedChangelogPathResolver; import org.apache.paimon.utils.DataFilePathFactories; import org.apache.paimon.utils.ExecutorThreadFactory; import org.apache.paimon.utils.IndexFilePathFactories; @@ -300,6 +301,15 @@ private void checkFilesExistence(List committables) { } } + // Resolve compacted changelog files to their real file paths + List resolvedFiles = new ArrayList<>(); + for (Path file : files) { + resolvedFiles.add(CompactedChangelogPathResolver.resolveCompactedChangelogPath(file)); + } + // Deduplicate paths as multiple compacted changelog references may resolve to the same + // physical file + resolvedFiles = resolvedFiles.stream().distinct().collect(Collectors.toList()); + Predicate nonExists = p -> { try { @@ -314,7 +324,7 @@ private void checkFilesExistence(List committables) { randomlyExecuteSequentialReturn( getExecutorService(null), f -> nonExists.test(f) ? singletonList(f) : emptyList(), - files)); + resolvedFiles)); if (!nonExistFiles.isEmpty()) { String message = diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java b/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java new file mode 100644 index 000000000000..5263497ecc82 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/utils/CompactedChangelogPathResolver.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.fs.Path; + +/** + * Utility class for resolving compacted changelog file paths. + * + *

This class provides functionality to resolve fake compacted changelog file paths to their real + * file paths. + * + *

File Name Protocol + * + *

There are two kinds of file name. In the following description, bid1 and + * bid2 are bucket id, off is offset, len1 and len2 + * are lengths. + * + *

    + *
  • bucket-bid1/compacted-changelog-xxx$bid1-len1: This is the real file name. If + * this file name is recorded in manifest file meta, reader should read the bytes of this file + * starting from offset 0 with length len1. + *
  • bucket-bid2/compacted-changelog-xxx$bid1-len1-off-len2: This is the fake file + * name. Reader should read the bytes of file + * bucket-bid1/compacted-changelog-xxx$bid1-len1 starting from offset off + * with length len2. + *
+ */ +public class CompactedChangelogPathResolver { + + /** + * Checks if the given path is a compacted changelog file path. + * + * @param path the file path to check + * @return true if the path is a compacted changelog file, false otherwise + */ + public static boolean isCompactedChangelogPath(Path path) { + return path.getName().startsWith("compacted-changelog-"); + } + + /** + * Resolves a file path, handling compacted changelog file path resolution if applicable. + * + *

For compacted changelog files, resolves fake file paths to their real file paths as + * described in the protocol above. For non-compacted changelog files, returns the path + * unchanged. + * + * @param path the file path to resolve + * @return the resolved real file path for compacted changelog files, or the original path + * unchanged for other files + */ + public static Path resolveCompactedChangelogPath(Path path) { + if (!isCompactedChangelogPath(path)) { + return path; + } + return decodePath(path).getPath(); + } + + /** + * Decodes a compacted changelog file path to extract the real path, offset, and length. + * + * @param path the file path to decode + * @return the decode result containing real path, offset, and length + */ + public static DecodeResult decodePath(Path path) { + String[] nameAndFormat = path.getName().split("\\."); + String[] names = nameAndFormat[0].split("\\$"); + String[] split = names[1].split("-"); + if (split.length == 2) { + return new DecodeResult(path, 0, Long.parseLong(split[1])); + } else { + Path realPath = + new Path( + path.getParent().getParent(), + "bucket-" + + split[0] + + "/" + + names[0] + + "$" + + split[0] + + "-" + + split[1] + + "." + + nameAndFormat[1]); + return new DecodeResult(realPath, Long.parseLong(split[2]), Long.parseLong(split[3])); + } + } + + /** Result of decoding a compacted changelog file path. */ + public static class DecodeResult { + + private final Path path; + private final long offset; + private final long length; + + public DecodeResult(Path path, long offset, long length) { + this.path = path; + this.offset = offset; + this.length = length; + } + + public Path getPath() { + return path; + } + + public long getOffset() { + return offset; + } + + public long getLength() { + return length; + } + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java index f8cb17bd91c8..0a39ca7ef8b4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/TableCommitTest.java @@ -24,7 +24,10 @@ import org.apache.paimon.data.GenericRow; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.io.DataIncrement; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestEntry; @@ -34,6 +37,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.SchemaUtils; import org.apache.paimon.schema.TableSchema; +import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.table.CatalogEnvironment; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.FileStoreTableFactory; @@ -438,4 +442,178 @@ public void testExpireForEmptyCommit() throws Exception { assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(5); assertThat(snapshotManager.latestSnapshotId()).isEqualTo(6); } + + @Test + public void testRecoverCompactedChangelogFiles() throws Exception { + String path = tempDir.toString(); + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.BIGINT()}, + new String[] {"k", "v"}); + + Options options = new Options(); + options.set(CoreOptions.PATH, path); + options.set(CoreOptions.BUCKET, 3); + TableSchema tableSchema = + SchemaUtils.forceCommit( + new SchemaManager(LocalFileIO.create(), new Path(path)), + new Schema( + rowType.getFields(), + Collections.emptyList(), + Collections.singletonList("k"), + options.toMap(), + "")); + + FileStoreTable table = + FileStoreTableFactory.create( + LocalFileIO.create(), + new Path(path), + tableSchema, + CatalogEnvironment.empty()); + + // Create fake compacted changelog files that should resolve to real files + String realChangelogFile = + "compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253.cc-parquet"; + String fakeChangelogFile1 = + "compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253-39253-35699.cc-parquet"; + String fakeChangelogFile2 = + "compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253-74952-37725.cc-parquet"; + + // Create directory structure + Path bucket0Dir = new Path(path, "bucket-0"); + Path bucket1Dir = new Path(path, "bucket-1"); + Path bucket2Dir = new Path(path, "bucket-2"); + LocalFileIO.create().mkdirs(bucket0Dir); + LocalFileIO.create().mkdirs(bucket1Dir); + LocalFileIO.create().mkdirs(bucket2Dir); + + // Create the real compacted changelog file + Path realFilePath = new Path(bucket0Dir, realChangelogFile); + LocalFileIO.create().newOutputStream(realFilePath, false).close(); + + DataFileMeta realFileMeta = + DataFileMeta.forAppend( + realChangelogFile, + 3000L, + 300L, + SimpleStats.EMPTY_STATS, + 0L, + 0L, + 1L, + Collections.emptyList(), + null, + null, + null, + null, + null, + null); + + // Create fake DataFileMeta for compacted changelog files + DataFileMeta fakeFileMeta1 = + DataFileMeta.forAppend( + fakeChangelogFile1, + 1000L, + 100L, + SimpleStats.EMPTY_STATS, + 0L, + 0L, + 1L, + Collections.emptyList(), + null, + null, + null, + null, + null, + null); + + DataFileMeta fakeFileMeta2 = + DataFileMeta.forAppend( + fakeChangelogFile2, + 2000L, + 200L, + SimpleStats.EMPTY_STATS, + 0L, + 0L, + 1L, + Collections.emptyList(), + null, + null, + null, + null, + null, + null); + + // Create commit message with fake compacted changelog files + BinaryRow partition = BinaryRow.EMPTY_ROW; + CommitMessageImpl commitMessage0 = + new CommitMessageImpl( + partition, + 0, + 3, + new DataIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(realFileMeta)), + new CompactIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList())); + CommitMessageImpl commitMessage1 = + new CommitMessageImpl( + partition, + 1, + 3, + new DataIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(fakeFileMeta1)), + new CompactIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList())); + CommitMessageImpl commitMessage2 = + new CommitMessageImpl( + partition, + 2, + 3, + new DataIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.singletonList(fakeFileMeta2)), + new CompactIncrement( + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList())); + + ManifestCommittable committable = new ManifestCommittable(1L); + committable.addFileCommittable(commitMessage0); + committable.addFileCommittable(commitMessage1); + committable.addFileCommittable(commitMessage2); + + String commitUser = UUID.randomUUID().toString(); + try (TableCommitImpl commit = table.newCommit(commitUser)) { + // This should succeed because fake files resolve to the existing real file + commit.filterAndCommitMultiple(Collections.singletonList(committable), false); + } + + // Now delete the real file and test that the check fails + LocalFileIO.create().delete(realFilePath, false); + + // Create a new committable with a larger identifier to simulate recovery from checkpoint + // This identifier must be larger than the previously committed identifier (1L) + ManifestCommittable newCommittable = new ManifestCommittable(2L); + newCommittable.addFileCommittable(commitMessage0); + newCommittable.addFileCommittable(commitMessage1); + newCommittable.addFileCommittable(commitMessage2); + + try (TableCommitImpl commit = table.newCommit(commitUser)) { + assertThatThrownBy( + () -> + commit.filterAndCommitMultiple( + Collections.singletonList(newCommittable), false)) + .hasMessageContaining( + "Cannot recover from this checkpoint because some files in the" + + " snapshot that need to be resubmitted have been deleted"); + } + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/utils/CompactedChangelogPathResolverTest.java b/paimon-core/src/test/java/org/apache/paimon/utils/CompactedChangelogPathResolverTest.java new file mode 100644 index 000000000000..f2b7941096a3 --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/utils/CompactedChangelogPathResolverTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.utils; + +import org.apache.paimon.fs.Path; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link CompactedChangelogPathResolver}. */ +public class CompactedChangelogPathResolverTest { + + @Test + public void testIsCompactedChangelogPath() { + // Test non-compacted changelog file + Path regularFile = + new Path( + "/path/to/table/bucket-0/changelog-25b05ab0-6f90-4865-a984-8d9629bac735-1426.parquet"); + assertThat(CompactedChangelogPathResolver.isCompactedChangelogPath(regularFile)).isFalse(); + + // Test compacted changelog file + Path compactedFile = + new Path( + "/path/to/table/bucket-0/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253.cc-parquet"); + assertThat(CompactedChangelogPathResolver.isCompactedChangelogPath(compactedFile)).isTrue(); + + // Test regular data file + Path dataFile = new Path("/path/to/table/bucket-0/data-file-1.parquet"); + assertThat(CompactedChangelogPathResolver.isCompactedChangelogPath(dataFile)).isFalse(); + } + + @Test + public void testResolveNonCompactedChangelogFile() { + // Test regular changelog file - should return unchanged + Path regularFile = + new Path( + "/path/to/table/bucket-0/changelog-25b05ab0-6f90-4865-a984-8d9629bac735-1426.parquet"); + Path resolved = CompactedChangelogPathResolver.resolveCompactedChangelogPath(regularFile); + assertThat(resolved).isEqualTo(regularFile); + } + + @Test + public void testResolveRealCompactedChangelogFile() { + // Test real compacted changelog file - should return unchanged + Path realFile = + new Path( + "/path/to/table/bucket-0/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253.cc-parquet"); + Path resolved = CompactedChangelogPathResolver.resolveCompactedChangelogPath(realFile); + assertThat(resolved).isEqualTo(realFile); + } + + @Test + public void testResolveFakeCompactedChangelogFile() { + // Test fake compacted changelog file - should resolve to real path + Path fakeFile = + new Path( + "/path/to/table/bucket-1/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253-39253-35699.cc-parquet"); + Path resolved = CompactedChangelogPathResolver.resolveCompactedChangelogPath(fakeFile); + + Path expectedRealFile = + new Path( + "/path/to/table/bucket-0/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253.cc-parquet"); + assertThat(resolved).isEqualTo(expectedRealFile); + } + + @Test + public void testResolveWithDifferentFormats() { + // Test with different file formats + Path fakeOrcFile = + new Path( + "/path/to/table/bucket-2/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-1024-1024-512.cc-orc"); + Path resolvedOrc = + CompactedChangelogPathResolver.resolveCompactedChangelogPath(fakeOrcFile); + Path expectedOrcFile = + new Path( + "/path/to/table/bucket-0/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-1024.cc-orc"); + assertThat(resolvedOrc).isEqualTo(expectedOrcFile); + + Path fakeAvroFile = + new Path( + "/path/to/table/bucket-5/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$2-2048-2048-1024.cc-avro"); + Path resolvedAvro = + CompactedChangelogPathResolver.resolveCompactedChangelogPath(fakeAvroFile); + Path expectedAvroFile = + new Path( + "/path/to/table/bucket-2/compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$2-2048.cc-avro"); + assertThat(resolvedAvro).isEqualTo(expectedAvroFile); + } + + @Test + public void testResolveFileWithoutExtension() { + // Test file without file extension - should return unchanged + Path fileWithoutExt = new Path("/path/to/table/file"); + Path resolved = + CompactedChangelogPathResolver.resolveCompactedChangelogPath(fileWithoutExt); + assertThat(resolved).isEqualTo(fileWithoutExt); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java index cd4a1d8c3d87..70d2555cf3a2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/changelog/format/CompactedChangelogFormatReaderFactory.java @@ -27,6 +27,7 @@ import org.apache.paimon.fs.PositionOutputStream; import org.apache.paimon.fs.SeekableInputStream; import org.apache.paimon.reader.FileRecordReader; +import org.apache.paimon.utils.CompactedChangelogPathResolver; import org.apache.paimon.utils.RoaringBitmap32; import java.io.EOFException; @@ -35,21 +36,8 @@ /** * {@link FormatReaderFactory} for compacted changelog. * - *

File Name Protocol - * - *

There are two kinds of file name. In the following description, bid1 and - * bid2 are bucket id, off is offset, len1 and len2 - * are lengths. - * - *

    - *
  • bucket-bid1/compacted-changelog-xxx$bid1-len1: This is the real file name. If - * this file name is recorded in manifest file meta, reader should read the bytes of this file - * starting from offset 0 with length len1. - *
  • bucket-bid2/compacted-changelog-xxx$bid1-len1-off-len2: This is the fake file - * name. Reader should read the bytes of file - * bucket-bid1/compacted-changelog-xxx$bid1-len1 starting from offset off - * with length len2. - *
+ *

Uses {@link org.apache.paimon.utils.CompactedChangelogPathResolver} for file name protocol + * handling. */ public class CompactedChangelogFormatReaderFactory implements FormatReaderFactory { @@ -62,7 +50,7 @@ public CompactedChangelogFormatReaderFactory(FormatReaderFactory wrapped) { @Override public FileRecordReader createReader(Context context) throws IOException { OffsetReadOnlyFileIO fileIO = new OffsetReadOnlyFileIO(context.fileIO()); - long length = decodePath(context.filePath()).length; + long length = CompactedChangelogPathResolver.decodePath(context.filePath()).getLength(); return wrapped.createReader( new Context() { @@ -89,43 +77,6 @@ public RoaringBitmap32 selection() { }); } - private static DecodeResult decodePath(Path path) { - String[] nameAndFormat = path.getName().split("\\."); - String[] names = nameAndFormat[0].split("\\$"); - String[] split = names[1].split("-"); - if (split.length == 2) { - return new DecodeResult(path, 0, Long.parseLong(split[1])); - } else { - Path realPath = - new Path( - path.getParent().getParent(), - "bucket-" - + split[0] - + "/" - + names[0] - + "$" - + split[0] - + "-" - + split[1] - + "." - + nameAndFormat[1]); - return new DecodeResult(realPath, Long.parseLong(split[2]), Long.parseLong(split[3])); - } - } - - private static class DecodeResult { - - private final Path path; - private final long offset; - private final long length; - - private DecodeResult(Path path, long offset, long length) { - this.path = path; - this.offset = offset; - this.length = length; - } - } - private static class OffsetReadOnlyFileIO implements FileIO { private final FileIO wrapped; @@ -146,9 +97,12 @@ public void configure(CatalogContext context) { @Override public SeekableInputStream newInputStream(Path path) throws IOException { - DecodeResult result = decodePath(path); + CompactedChangelogPathResolver.DecodeResult result = + CompactedChangelogPathResolver.decodePath(path); return new OffsetSeekableInputStream( - wrapped.newInputStream(result.path), result.offset, result.length); + wrapped.newInputStream(result.getPath()), + result.getOffset(), + result.getLength()); } @Override @@ -159,14 +113,15 @@ public PositionOutputStream newOutputStream(Path path, boolean overwrite) @Override public FileStatus getFileStatus(Path path) throws IOException { - DecodeResult result = decodePath(path); - FileStatus status = wrapped.getFileStatus(result.path); + CompactedChangelogPathResolver.DecodeResult result = + CompactedChangelogPathResolver.decodePath(path); + FileStatus status = wrapped.getFileStatus(result.getPath()); return new FileStatus() { @Override public long getLen() { - return result.length; + return result.getLength(); } @Override @@ -193,7 +148,7 @@ public FileStatus[] listStatus(Path path) throws IOException { @Override public boolean exists(Path path) throws IOException { - return wrapped.exists(decodePath(path).path); + return wrapped.exists(CompactedChangelogPathResolver.decodePath(path).getPath()); } @Override