Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.tag.TagAutoManager;
import org.apache.paimon.tag.TagTimeExpire;
import org.apache.paimon.utils.CompactedChangelogPathResolver;
import org.apache.paimon.utils.DataFilePathFactories;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.IndexFilePathFactories;
Expand Down Expand Up @@ -300,6 +301,15 @@ private void checkFilesExistence(List<ManifestCommittable> committables) {
}
}

// Resolve compacted changelog files to their real file paths
List<Path> resolvedFiles = new ArrayList<>();
for (Path file : files) {
resolvedFiles.add(CompactedChangelogPathResolver.resolveCompactedChangelogPath(file));
}
// Deduplicate paths as multiple compacted changelog references may resolve to the same
// physical file
resolvedFiles = resolvedFiles.stream().distinct().collect(Collectors.toList());

Predicate<Path> nonExists =
p -> {
try {
Expand All @@ -314,7 +324,7 @@ private void checkFilesExistence(List<ManifestCommittable> committables) {
randomlyExecuteSequentialReturn(
getExecutorService(null),
f -> nonExists.test(f) ? singletonList(f) : emptyList(),
files));
resolvedFiles));

if (!nonExistFiles.isEmpty()) {
String message =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.utils;

import org.apache.paimon.fs.Path;

/**
* Utility class for resolving compacted changelog file paths.
*
* <p>This class provides functionality to resolve fake compacted changelog file paths to their real
* file paths.
*
* <p><b>File Name Protocol</b>
*
* <p>There are two kinds of file name. In the following description, <code>bid1</code> and <code>
* bid2</code> are bucket id, <code>off</code> is offset, <code>len1</code> and <code>len2</code>
* are lengths.
*
* <ul>
* <li><code>bucket-bid1/compacted-changelog-xxx$bid1-len1</code>: This is the real file name. If
* this file name is recorded in manifest file meta, reader should read the bytes of this file
* starting from offset <code>0</code> with length <code>len1</code>.
* <li><code>bucket-bid2/compacted-changelog-xxx$bid1-len1-off-len2</code>: This is the fake file
* name. Reader should read the bytes of file <code>
* bucket-bid1/compacted-changelog-xxx$bid1-len1</code> starting from offset <code>off</code>
* with length <code>len2</code>.
* </ul>
*/
public class CompactedChangelogPathResolver {

/**
* Checks if the given path is a compacted changelog file path.
*
* @param path the file path to check
* @return true if the path is a compacted changelog file, false otherwise
*/
public static boolean isCompactedChangelogPath(Path path) {
return path.getName().startsWith("compacted-changelog-");
}

/**
* Resolves a file path, handling compacted changelog file path resolution if applicable.
*
* <p>For compacted changelog files, resolves fake file paths to their real file paths as
* described in the protocol above. For non-compacted changelog files, returns the path
* unchanged.
*
* @param path the file path to resolve
* @return the resolved real file path for compacted changelog files, or the original path
* unchanged for other files
*/
public static Path resolveCompactedChangelogPath(Path path) {
if (!isCompactedChangelogPath(path)) {
return path;
}
return decodePath(path).getPath();
}

/**
* Decodes a compacted changelog file path to extract the real path, offset, and length.
*
* @param path the file path to decode
* @return the decode result containing real path, offset, and length
*/
public static DecodeResult decodePath(Path path) {
String[] nameAndFormat = path.getName().split("\\.");
String[] names = nameAndFormat[0].split("\\$");
String[] split = names[1].split("-");
if (split.length == 2) {
return new DecodeResult(path, 0, Long.parseLong(split[1]));
} else {
Path realPath =
new Path(
path.getParent().getParent(),
"bucket-"
+ split[0]
+ "/"
+ names[0]
+ "$"
+ split[0]
+ "-"
+ split[1]
+ "."
+ nameAndFormat[1]);
return new DecodeResult(realPath, Long.parseLong(split[2]), Long.parseLong(split[3]));
}
}

/** Result of decoding a compacted changelog file path. */
public static class DecodeResult {

private final Path path;
private final long offset;
private final long length;

public DecodeResult(Path path, long offset, long length) {
this.path = path;
this.offset = offset;
this.length = length;
}

public Path getPath() {
return path;
}

public long getOffset() {
return offset;
}

public long getLength() {
return length;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.io.DataIncrement;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.manifest.ManifestEntry;
Expand All @@ -34,6 +37,7 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.stats.SimpleStats;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
Expand Down Expand Up @@ -438,4 +442,178 @@ public void testExpireForEmptyCommit() throws Exception {
assertThat(snapshotManager.earliestSnapshotId()).isEqualTo(5);
assertThat(snapshotManager.latestSnapshotId()).isEqualTo(6);
}

@Test
public void testRecoverCompactedChangelogFiles() throws Exception {
String path = tempDir.toString();
RowType rowType =
RowType.of(
new DataType[] {DataTypes.INT(), DataTypes.BIGINT()},
new String[] {"k", "v"});

Options options = new Options();
options.set(CoreOptions.PATH, path);
options.set(CoreOptions.BUCKET, 3);
TableSchema tableSchema =
SchemaUtils.forceCommit(
new SchemaManager(LocalFileIO.create(), new Path(path)),
new Schema(
rowType.getFields(),
Collections.emptyList(),
Collections.singletonList("k"),
options.toMap(),
""));

FileStoreTable table =
FileStoreTableFactory.create(
LocalFileIO.create(),
new Path(path),
tableSchema,
CatalogEnvironment.empty());

// Create fake compacted changelog files that should resolve to real files
String realChangelogFile =
"compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253.cc-parquet";
String fakeChangelogFile1 =
"compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253-39253-35699.cc-parquet";
String fakeChangelogFile2 =
"compacted-changelog-8e049c65-5ce4-4ce7-b1b0-78ce694ab351$0-39253-74952-37725.cc-parquet";

// Create directory structure
Path bucket0Dir = new Path(path, "bucket-0");
Path bucket1Dir = new Path(path, "bucket-1");
Path bucket2Dir = new Path(path, "bucket-2");
LocalFileIO.create().mkdirs(bucket0Dir);
LocalFileIO.create().mkdirs(bucket1Dir);
LocalFileIO.create().mkdirs(bucket2Dir);

// Create the real compacted changelog file
Path realFilePath = new Path(bucket0Dir, realChangelogFile);
LocalFileIO.create().newOutputStream(realFilePath, false).close();

DataFileMeta realFileMeta =
DataFileMeta.forAppend(
realChangelogFile,
3000L,
300L,
SimpleStats.EMPTY_STATS,
0L,
0L,
1L,
Collections.emptyList(),
null,
null,
null,
null,
null,
null);

// Create fake DataFileMeta for compacted changelog files
DataFileMeta fakeFileMeta1 =
DataFileMeta.forAppend(
fakeChangelogFile1,
1000L,
100L,
SimpleStats.EMPTY_STATS,
0L,
0L,
1L,
Collections.emptyList(),
null,
null,
null,
null,
null,
null);

DataFileMeta fakeFileMeta2 =
DataFileMeta.forAppend(
fakeChangelogFile2,
2000L,
200L,
SimpleStats.EMPTY_STATS,
0L,
0L,
1L,
Collections.emptyList(),
null,
null,
null,
null,
null,
null);

// Create commit message with fake compacted changelog files
BinaryRow partition = BinaryRow.EMPTY_ROW;
CommitMessageImpl commitMessage0 =
new CommitMessageImpl(
partition,
0,
3,
new DataIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.singletonList(realFileMeta)),
new CompactIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()));
CommitMessageImpl commitMessage1 =
new CommitMessageImpl(
partition,
1,
3,
new DataIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.singletonList(fakeFileMeta1)),
new CompactIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()));
CommitMessageImpl commitMessage2 =
new CommitMessageImpl(
partition,
2,
3,
new DataIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.singletonList(fakeFileMeta2)),
new CompactIncrement(
Collections.emptyList(),
Collections.emptyList(),
Collections.emptyList()));

ManifestCommittable committable = new ManifestCommittable(1L);
committable.addFileCommittable(commitMessage0);
committable.addFileCommittable(commitMessage1);
committable.addFileCommittable(commitMessage2);

String commitUser = UUID.randomUUID().toString();
try (TableCommitImpl commit = table.newCommit(commitUser)) {
// This should succeed because fake files resolve to the existing real file
commit.filterAndCommitMultiple(Collections.singletonList(committable), false);
}

// Now delete the real file and test that the check fails
LocalFileIO.create().delete(realFilePath, false);

// Create a new committable with a larger identifier to simulate recovery from checkpoint
// This identifier must be larger than the previously committed identifier (1L)
ManifestCommittable newCommittable = new ManifestCommittable(2L);
newCommittable.addFileCommittable(commitMessage0);
newCommittable.addFileCommittable(commitMessage1);
newCommittable.addFileCommittable(commitMessage2);

try (TableCommitImpl commit = table.newCommit(commitUser)) {
assertThatThrownBy(
() ->
commit.filterAndCommitMultiple(
Collections.singletonList(newCommittable), false))
.hasMessageContaining(
"Cannot recover from this checkpoint because some files in the"
+ " snapshot that need to be resubmitted have been deleted");
}
}
}
Loading
Loading