diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index e55e41e3d92e..4d1ad27fbdcf 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -344,6 +344,31 @@ All available procedures are listed below.
CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => false, parallelism => '5', mode => 'local')
+
| repair |
diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java b/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java
new file mode 100644
index 000000000000..a704a329c71a
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.utils.FileStorePathFactory;
+import org.apache.paimon.utils.ThreadPoolUtils;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadPoolExecutor;
+
+import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
+
+/** List what data files recorded in manifests are missing from the filesystem. */
+public class ListUnexistingFiles {
+
+ private final FileStoreTable table;
+ private final FileStorePathFactory pathFactory;
+ private final ThreadPoolExecutor executor;
+
+ public ListUnexistingFiles(FileStoreTable table) {
+ this.table = table;
+ this.pathFactory = table.store().pathFactory();
+ this.executor =
+ createCachedThreadPool(
+ table.coreOptions().deleteFileThreadNum(), "LIST_UNEXISTING_FILES");
+ }
+
+ public Map> list(BinaryRow partition) throws Exception {
+ Map> result = new HashMap<>();
+ List splits =
+ table.newScan()
+ .withPartitionFilter(Collections.singletonList(partition))
+ .plan()
+ .splits();
+ Iterator it =
+ ThreadPoolUtils.randomlyExecuteSequentialReturn(
+ executor, split -> listFilesInDataSplit((DataSplit) split), splits);
+ while (it.hasNext()) {
+ ListResult item = it.next();
+ result.computeIfAbsent(item.bucket, k -> new HashMap<>()).put(item.path, item.meta);
+ }
+ return result;
+ }
+
+ private List listFilesInDataSplit(DataSplit dataSplit) {
+ List results = new ArrayList<>();
+ DataFilePathFactory dataFilePathFactory =
+ pathFactory.createDataFilePathFactory(dataSplit.partition(), dataSplit.bucket());
+ for (DataFileMeta meta : dataSplit.dataFiles()) {
+ Path path = dataFilePathFactory.toPath(meta);
+ try {
+ if (!table.fileIO().exists(path)) {
+ results.add(new ListResult(dataSplit.bucket(), path.toString(), meta));
+ }
+ } catch (IOException e) {
+ throw new UncheckedIOException("Cannot determine if file " + path + " exists.", e);
+ }
+ }
+ return results;
+ }
+
+ private static class ListResult {
+
+ private final int bucket;
+ private final String path;
+ private final DataFileMeta meta;
+
+ private ListResult(int bucket, String path, DataFileMeta meta) {
+ this.bucket = bucket;
+ this.path = path;
+ this.meta = meta;
+ }
+ }
+}
diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ListUnexistingFilesTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ListUnexistingFilesTest.java
new file mode 100644
index 000000000000..7936aa517349
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/operation/ListUnexistingFilesTest.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.sink.TableWriteImpl;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ListUnexistingFiles}. */
+public class ListUnexistingFilesTest {
+
+ @TempDir java.nio.file.Path tempDir;
+
+ @ParameterizedTest
+ @ValueSource(ints = {-1, 3})
+ public void testListFiles(int bucket) throws Exception {
+ int numPartitions = 2;
+ int numFiles = 10;
+ int[] numDeletes = new int[numPartitions];
+ FileStoreTable table =
+ prepareRandomlyDeletedTable(
+ tempDir.toString(), "mydb", "t", bucket, numFiles, numDeletes);
+
+ Function binaryRow =
+ i -> {
+ BinaryRow b = new BinaryRow(1);
+ BinaryRowWriter writer = new BinaryRowWriter(b);
+ writer.writeInt(0, i);
+ writer.complete();
+ return b;
+ };
+
+ ListUnexistingFiles operation = new ListUnexistingFiles(table);
+ for (int i = 0; i < numPartitions; i++) {
+ Map> result = operation.list(binaryRow.apply(i));
+ assertThat(result.values().stream().mapToInt(Map::size).sum()).isEqualTo(numDeletes[i]);
+ }
+ }
+
+ public static FileStoreTable prepareRandomlyDeletedTable(
+ String warehouse,
+ String databaseName,
+ String tableName,
+ int bucket,
+ int numFiles,
+ int[] numDeletes)
+ throws Exception {
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()},
+ new String[] {"pt", "id", "v"});
+ Map options = new HashMap<>();
+ options.put(CoreOptions.BUCKET.key(), String.valueOf(bucket));
+ options.put(CoreOptions.WRITE_ONLY.key(), "true");
+ if (bucket > 0) {
+ options.put(CoreOptions.BUCKET_KEY.key(), "id");
+ }
+ FileStoreTable table =
+ createPaimonTable(
+ warehouse,
+ databaseName,
+ tableName,
+ rowType,
+ Collections.singletonList("pt"),
+ options);
+
+ String commitUser = UUID.randomUUID().toString();
+ TableWriteImpl> write = table.newWrite(commitUser);
+ TableCommitImpl commit = table.newCommit(commitUser);
+
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ int numPartitions = numDeletes.length;
+ for (int i = 0; i < numPartitions; i++) {
+ numDeletes[i] = random.nextInt(0, numFiles + 1);
+ }
+
+ int identifier = 0;
+ for (int i = 0; i < numPartitions; i++) {
+ for (int j = 0; j < numFiles; j++) {
+ write.write(GenericRow.of(i, random.nextInt(), random.nextLong()));
+ identifier++;
+ commit.commit(identifier, write.prepareCommit(false, identifier));
+ }
+ }
+
+ write.close();
+ commit.close();
+
+ for (int i = 0; i < numPartitions; i++) {
+ LocalFileIO fileIO = LocalFileIO.create();
+ List paths = new ArrayList<>();
+ for (int j = 0; j < Math.max(1, bucket); j++) {
+ Path path = new Path(table.location(), "pt=" + i + "/bucket-" + j);
+ paths.addAll(
+ Arrays.stream(fileIO.listStatus(path))
+ .map(FileStatus::getPath)
+ .collect(Collectors.toList()));
+ }
+ Collections.shuffle(paths);
+ for (int j = 0; j < numDeletes[i]; j++) {
+ fileIO.deleteQuietly(paths.get(j));
+ }
+ }
+
+ return table;
+ }
+
+ private static FileStoreTable createPaimonTable(
+ String warehouse,
+ String databaseName,
+ String tableName,
+ RowType rowType,
+ List partitionKeys,
+ Map customOptions)
+ throws Exception {
+ LocalFileIO fileIO = LocalFileIO.create();
+ Path path = new Path(warehouse);
+
+ Schema schema =
+ new Schema(
+ rowType.getFields(),
+ partitionKeys,
+ Collections.emptyList(),
+ customOptions,
+ "");
+
+ try (FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, path)) {
+ paimonCatalog.createDatabase(databaseName, true);
+ Identifier paimonIdentifier = Identifier.create(databaseName, tableName);
+ paimonCatalog.createTable(paimonIdentifier, schema, false);
+ return (FileStoreTable) paimonCatalog.getTable(paimonIdentifier);
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java
new file mode 100644
index 000000000000..8d014d66ee02
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.sink.Committable;
+import org.apache.paimon.flink.sink.CommittableTypeInfo;
+import org.apache.paimon.flink.utils.BoundedOneInputOperator;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.operation.ListUnexistingFiles;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * Action to remove unexisting data files from manifest entries. It has the following use cases:
+ *
+ *
+ * - There is currently a known case when unexisting data file might be written into manifest.
+ * Consider a write-only job (W) plus a dedicated compaction job (C):
+ *
+ * - W commits a snapshot with file F. Then W constantly fails and restarts, each time
+ * before we can retry the commit (or W is stopped, creating Flink savepoint S).
+ *
- C compacts F into a larger file, so F is now deleted from the manifest.
+ *
- Before the compact snapshot expires, but after all snapshots created by W expires, W
+ * comes back to normal (or restarted from savepoint S). As W cannot find its previous
+ * snapshot, it assumes that this snapshot has not been committed (see {@link
+ * org.apache.paimon.operation.FileStoreCommitImpl#filterCommitted} for more detail), so
+ * file F is committed to the manifest once again.
+ *
- When the compact snapshot expires, file F will be deleted from the file system. Now F
+ * is in the manifest, but not on the file system. With this situation, user might want
+ * to remove F from the manifest to continue reading the table.
+ *
+ * - User deletes a data file by mistake (for example, by incorrectly setting the time threshold
+ * for orphan files cleaning). If the user can tolerate skipping some records when consuming
+ * this table, he can also use this action to remove the file from manifest.
+ *
+ *
+ * Note that user is on his own risk using this procedure, which may cause data loss when used
+ * outside from the use cases above.
+ */
+public class RemoveUnexistingFilesAction extends TableActionBase {
+
+ private static final OutputTag RESULT_SIDE_OUTPUT =
+ new OutputTag<>("result-side-output", BasicTypeInfo.STRING_TYPE_INFO);
+
+ private boolean dryRun = false;
+ @Nullable private Integer parallelism = null;
+
+ public RemoveUnexistingFilesAction(
+ String databaseName, String tableName, Map catalogConfig) {
+ super(databaseName, tableName, catalogConfig);
+ }
+
+ public RemoveUnexistingFilesAction dryRun() {
+ this.dryRun = true;
+ return this;
+ }
+
+ public RemoveUnexistingFilesAction withParallelism(int parallelism) {
+ this.parallelism = parallelism;
+ return this;
+ }
+
+ @Override
+ public void build() throws Exception {
+ buildDataStream();
+ }
+
+ public DataStream buildDataStream() throws Exception {
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ List binaryPartitions = fileStoreTable.newScan().listPartitions();
+
+ SingleOutputStreamOperator source =
+ env.fromData(
+ binaryPartitions.stream()
+ .map(BinaryRow::toBytes)
+ .collect(Collectors.toList()),
+ PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO)
+ .name("Remove Unexisting Files Source")
+ .forceNonParallel();
+
+ SingleOutputStreamOperator worker =
+ source.transform(
+ "Remove Unexisting Files Worker",
+ new CommittableTypeInfo(),
+ new WorkerOperator(fileStoreTable));
+ if (parallelism != null) {
+ worker = worker.setParallelism(Math.min(parallelism, binaryPartitions.size()));
+ }
+
+ DataStream result = worker.getSideOutput(RESULT_SIDE_OUTPUT);
+ if (dryRun) {
+ return result;
+ }
+
+ worker.transform(
+ "Global Committer : " + table.name(),
+ new CommittableTypeInfo(),
+ new CommitOperator(fileStoreTable))
+ .forceNonParallel();
+ return result;
+ }
+
+ @Override
+ public void run() throws Exception {
+ build();
+ env.execute("Remove Unexisting Files : " + table.name());
+ }
+
+ private static class WorkerOperator extends BoundedOneInputOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FileStoreTable table;
+
+ private transient ListUnexistingFiles operation;
+ private transient BinaryRow reuse;
+
+ private WorkerOperator(FileStoreTable table) {
+ this.table = table;
+ }
+
+ @Override
+ public void open() throws Exception {
+ operation = new ListUnexistingFiles(table);
+ reuse = new BinaryRow(table.schema().partitionKeys().size());
+ }
+
+ @Override
+ public void processElement(StreamRecord record) throws Exception {
+ byte[] bytes = record.getValue();
+ reuse.pointTo(MemorySegment.wrap(bytes), 0, bytes.length);
+ Map> toDelete = operation.list(reuse);
+ for (Map.Entry> entry : toDelete.entrySet()) {
+ CommitMessageImpl message =
+ new CommitMessageImpl(
+ reuse,
+ entry.getKey(),
+ new DataIncrement(
+ Collections.emptyList(),
+ new ArrayList<>(entry.getValue().values()),
+ Collections.emptyList()),
+ CompactIncrement.emptyIncrement());
+ output.collect(
+ new StreamRecord<>(
+ new Committable(Long.MAX_VALUE, Committable.Kind.FILE, message)));
+ for (String path : entry.getValue().keySet()) {
+ output.collect(RESULT_SIDE_OUTPUT, new StreamRecord<>(path));
+ }
+ }
+ }
+
+ @Override
+ public void endInput() throws Exception {}
+ }
+
+ private static class CommitOperator extends BoundedOneInputOperator {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FileStoreTable table;
+
+ private transient List commitMessages;
+ private transient TableCommitImpl commit;
+
+ private CommitOperator(FileStoreTable table) {
+ this.table = table;
+ }
+
+ @Override
+ public void open() throws Exception {
+ commitMessages = new ArrayList<>();
+ commit = table.newCommit(UUID.randomUUID().toString());
+ }
+
+ @Override
+ public void processElement(StreamRecord record) throws Exception {
+ Committable committable = record.getValue();
+ Preconditions.checkArgument(
+ committable.kind() == Committable.Kind.FILE,
+ "Committable has kind " + committable.kind() + ". This is unexpected!");
+ commitMessages.add((CommitMessage) committable.wrappedCommittable());
+ }
+
+ @Override
+ public void endInput() throws Exception {
+ try {
+ commit.commit(Long.MAX_VALUE, commitMessages);
+ } catch (Exception e) {
+ // For batch jobs we don't know if this commit is new or being
+ // retried, so in theory we need to call filterAndCommit.
+ //
+ // However on the happy path, filtering takes time because there
+ // is no previous commit of this user, and the filtering process
+ // must go through all existing snapshots to determine this.
+ //
+ // So instead, we ask the user to retry this job if the commit
+ // failed, most probably due to a conflict. Why not throw this
+ // exception? Because throwing the exception will restart the
+ // job, if it is a batch job we'll have to filter the commit.
+ //
+ // Retrying this job will calculate what file entries to remove
+ // again, so there is no harm.
+ LOG.warn(
+ "Commit failed due to exception. "
+ + "Consider running this action or procedure again.",
+ e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (commit != null) {
+ commit.close();
+ }
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionFactory.java
new file mode 100644
index 000000000000..e4ed3bb8101f
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionFactory.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import java.util.Optional;
+
+/** Factory to create {@link RemoveUnexistingFilesAction}. */
+public class RemoveUnexistingFilesActionFactory implements ActionFactory {
+
+ public static final String IDENTIFIER = "remove_unexisting_files";
+ private static final String DRY_RUN = "dry_run";
+ private static final String PARALLELISM = "parallelism";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Optional create(MultipleParameterToolAdapter params) {
+ RemoveUnexistingFilesAction action =
+ new RemoveUnexistingFilesAction(
+ params.getRequired(DATABASE),
+ params.getRequired(TABLE),
+ catalogConfigMap(params));
+
+ if (params.has(DRY_RUN) && Boolean.parseBoolean(params.get(DRY_RUN))) {
+ action.dryRun();
+ }
+
+ if (params.has(PARALLELISM)) {
+ action.withParallelism(Integer.parseInt(params.get(PARALLELISM)));
+ }
+
+ return Optional.of(action);
+ }
+
+ @Override
+ public void printHelp() {
+ System.out.println(
+ "Action \"remove_unexisting_files\" removes unexisting data files from manifest entries.");
+ System.out.println(
+ "See Java docs in https://paimon.apache.org/docs/master/api/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.html for detailed use cases.");
+ System.out.println(
+ "Note that user is on his own risk using this procedure, which may cause data loss when used outside from the use cases in Java docs.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " remove_unexisting_files --warehouse --database "
+ + "--table [--partition [--partition ]] "
+ + "[--dry_run ] "
+ + "[--parallelism ]");
+ System.out.println(
+ "If partitions are not specified, this action will remove unexisting files from all partitions.");
+ System.out.println(
+ "When dry_run is set to true (default false), this action only checks what files will be removed, but not really remove them.");
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedure.java
new file mode 100644
index 000000000000..a1ec5070f364
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedure.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.action.RemoveUnexistingFilesAction;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.apache.flink.util.CloseableIterator;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Procedure to remove unexisting data files from manifest entries. See {@link
+ * RemoveUnexistingFilesAction} for detailed use cases.
+ *
+ *
+ * -- remove unexisting data files in table `mydb.myt`
+ * CALL sys.remove_unexisting_files(`table` => 'mydb.myt')
+ *
+ * -- only check what files will be removed, but not really remove them (dry run)
+ * CALL sys.remove_unexisting_files(`table` => 'mydb.myt', `dry_run` = true)
+ *
+ *
+ * Note that user is on his own risk using this procedure, which may cause data loss when used
+ * outside from the use cases above.
+ */
+public class RemoveUnexistingFilesProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "remove_unexisting_files";
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "dry_run", type = @DataTypeHint("BOOLEAN"), isOptional = true),
+ @ArgumentHint(name = "parallelism", type = @DataTypeHint("INT"), isOptional = true)
+ })
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ @Nullable Boolean dryRun,
+ @Nullable Integer parallelism)
+ throws Exception {
+ Identifier identifier = Identifier.fromString(tableId);
+ String databaseName = identifier.getDatabaseName();
+ String tableName = identifier.getObjectName();
+
+ RemoveUnexistingFilesAction action =
+ new RemoveUnexistingFilesAction(databaseName, tableName, catalog.options());
+ if (Boolean.TRUE.equals(dryRun)) {
+ action.dryRun();
+ }
+ if (parallelism != null) {
+ action.withParallelism(parallelism);
+ }
+ action.withStreamExecutionEnvironment(procedureContext.getExecutionEnvironment());
+
+ List result = new ArrayList<>();
+ try (CloseableIterator it =
+ action.buildDataStream()
+ .executeAndCollect("Remove Unexisting Files : " + tableName)) {
+ it.forEachRemaining(result::add);
+ }
+ return result.toArray(new String[0]);
+ }
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 6251189560f6..db932bb38f9c 100644
--- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -43,6 +43,7 @@ org.apache.paimon.flink.action.RenameTagActionFactory
org.apache.paimon.flink.action.RepairActionFactory
org.apache.paimon.flink.action.RewriteFileIndexActionFactory
org.apache.paimon.flink.action.ExpireSnapshotsActionFactory
+org.apache.paimon.flink.action.RemoveUnexistingFilesActionFactory
### procedure factories
org.apache.paimon.flink.procedure.CompactDatabaseProcedure
@@ -82,3 +83,4 @@ org.apache.paimon.flink.procedure.MarkPartitionDoneProcedure
org.apache.paimon.flink.procedure.CloneProcedure
org.apache.paimon.flink.procedure.CompactManifestProcedure
org.apache.paimon.flink.procedure.RefreshObjectTableProcedure
+org.apache.paimon.flink.procedure.RemoveUnexistingFilesProcedure
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionITCase.java
new file mode 100644
index 000000000000..0b32177a50e8
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionITCase.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.operation.ListUnexistingFilesTest;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for {@link RemoveUnexistingFilesAction}. */
+public class RemoveUnexistingFilesActionITCase extends ActionITCaseBase {
+
+ @ParameterizedTest
+ @ValueSource(ints = {-1, 3})
+ public void testAction(int bucket) throws Exception {
+ int numPartitions = 2;
+ int numFiles = 10;
+ int[] numDeletes = new int[numPartitions];
+ ListUnexistingFilesTest.prepareRandomlyDeletedTable(
+ warehouse, "mydb", "t", bucket, numFiles, numDeletes);
+
+ RemoveUnexistingFilesAction action =
+ createAction(
+ RemoveUnexistingFilesAction.class,
+ "remove_unexisting_files",
+ "--warehouse",
+ warehouse,
+ "--database",
+ "mydb",
+ "--table",
+ "t",
+ "--dry_run",
+ "true")
+ .withParallelism(2);
+ int[] actual = new int[numPartitions];
+ Pattern pattern = Pattern.compile("pt=(\\d+?)/");
+ try (CloseableIterator it = action.buildDataStream().executeAndCollect()) {
+ while (it.hasNext()) {
+ String path = it.next();
+ Matcher matcher = pattern.matcher(path);
+ if (matcher.find()) {
+ actual[Integer.parseInt(matcher.group(1))]++;
+ }
+ }
+ }
+ assertThat(actual).isEqualTo(numDeletes);
+
+ action =
+ createAction(
+ RemoveUnexistingFilesAction.class,
+ "remove_unexisting_files",
+ "--warehouse",
+ warehouse,
+ "--database",
+ "mydb",
+ "--table",
+ "t")
+ .withParallelism(2);
+ action.run();
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql(
+ "CREATE CATALOG mycat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "'\n"
+ + ")");
+ tEnv.executeSql("USE CATALOG mycat");
+ try (CloseableIterator it =
+ tEnv.executeSql("SELECT pt, CAST(COUNT(*) AS INT) FROM mydb.t GROUP BY pt")
+ .collect()) {
+ while (it.hasNext()) {
+ Row row = it.next();
+ assertThat(row.getField(1)).isEqualTo(numFiles - numDeletes[(int) row.getField(0)]);
+ }
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedureITCase.java
new file mode 100644
index 000000000000..b7f20d80243f
--- /dev/null
+++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedureITCase.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.flink.util.AbstractTestBase;
+import org.apache.paimon.operation.ListUnexistingFilesTest;
+
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT cases for {@link RemoveUnexistingFilesProcedure}. */
+public class RemoveUnexistingFilesProcedureITCase extends AbstractTestBase {
+
+ @ParameterizedTest
+ @ValueSource(ints = {-1, 3})
+ public void testProcedure(int bucket) throws Exception {
+ String warehouse = getTempDirPath();
+ int numPartitions = 2;
+ int numFiles = 10;
+ int[] numDeletes = new int[numPartitions];
+ ListUnexistingFilesTest.prepareRandomlyDeletedTable(
+ warehouse, "mydb", "t", bucket, numFiles, numDeletes);
+
+ TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+ tEnv.executeSql(
+ "CREATE CATALOG mycat WITH (\n"
+ + " 'type' = 'paimon',\n"
+ + " 'warehouse' = '"
+ + warehouse
+ + "'\n"
+ + ")");
+ tEnv.executeSql("USE CATALOG mycat");
+
+ int[] actual = new int[numPartitions];
+ Pattern pattern = Pattern.compile("pt=(\\d+?)/");
+ try (CloseableIterator it =
+ tEnv.executeSql(
+ "CALL sys.remove_unexisting_files(`table` => 'mydb.t', `dry_run` => true, `parallelism` => 2)")
+ .collect()) {
+ while (it.hasNext()) {
+ Row row = it.next();
+ Matcher matcher = pattern.matcher(row.getField(0).toString());
+ if (matcher.find()) {
+ actual[Integer.parseInt(matcher.group(1))]++;
+ }
+ }
+ }
+ assertThat(actual).isEqualTo(numDeletes);
+
+ tEnv.executeSql("CALL sys.remove_unexisting_files(`table` => 'mydb.t', `parallelism` => 2)")
+ .await();
+ try (CloseableIterator it =
+ tEnv.executeSql("SELECT pt, CAST(COUNT(*) AS INT) FROM mydb.t GROUP BY pt")
+ .collect()) {
+ while (it.hasNext()) {
+ Row row = it.next();
+ assertThat(row.getField(1)).isEqualTo(numFiles - numDeletes[(int) row.getField(0)]);
+ }
+ }
+ }
+}
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
index f5052ea25f95..06f747f606c3 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java
@@ -38,6 +38,7 @@
import org.apache.paimon.spark.procedure.PurgeFilesProcedure;
import org.apache.paimon.spark.procedure.RefreshObjectTableProcedure;
import org.apache.paimon.spark.procedure.RemoveOrphanFilesProcedure;
+import org.apache.paimon.spark.procedure.RemoveUnexistingFilesProcedure;
import org.apache.paimon.spark.procedure.RenameTagProcedure;
import org.apache.paimon.spark.procedure.RepairProcedure;
import org.apache.paimon.spark.procedure.ReplaceTagProcedure;
@@ -90,6 +91,7 @@ private static Map> initProcedureBuilders() {
procedureBuilders.put("migrate_table", MigrateTableProcedure::builder);
procedureBuilders.put("migrate_file", MigrateFileProcedure::builder);
procedureBuilders.put("remove_orphan_files", RemoveOrphanFilesProcedure::builder);
+ procedureBuilders.put("remove_unexisting_files", RemoveUnexistingFilesProcedure::builder);
procedureBuilders.put("expire_snapshots", ExpireSnapshotsProcedure::builder);
procedureBuilders.put("expire_partitions", ExpirePartitionsProcedure::builder);
procedureBuilders.put("repair", RepairProcedure::builder);
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
index dd5826420036..604b1d9b1957 100644
--- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java
@@ -23,7 +23,6 @@
import org.apache.paimon.operation.LocalOrphanFilesClean;
import org.apache.paimon.operation.OrphanFilesClean;
import org.apache.paimon.spark.catalog.WithPaimonCatalog;
-import org.apache.paimon.spark.orphan.SparkOrphanFilesClean;
import org.apache.paimon.utils.Preconditions;
import org.apache.spark.sql.catalyst.InternalRow;
diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveUnexistingFilesProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveUnexistingFilesProcedure.java
new file mode 100644
index 000000000000..555fcb145706
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveUnexistingFilesProcedure.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure;
+
+import org.apache.paimon.spark.catalog.WithPaimonCatalog;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+/**
+ * Procedure to remove unexisting data files from manifest entries. See {@code
+ * RemoveUnexistingFilesAction} in {@code paimon-flink-common} module for detailed use cases.
+ *
+ *
+ * -- remove unexisting data files in table `mydb.myt`
+ * CALL sys.remove_unexisting_files(`table` => 'mydb.myt')
+ *
+ * -- only check what files will be removed, but not really remove them (dry run)
+ * CALL sys.remove_unexisting_files(`table` => 'mydb.myt', `dry_run` = true)
+ *
+ *
+ * Note that user is on his own risk using this procedure, which may cause data loss when used
+ * outside from the use cases above.
+ */
+public class RemoveUnexistingFilesProcedure extends BaseProcedure {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RemoveUnexistingFilesProcedure.class);
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {
+ ProcedureParameter.required("table", DataTypes.StringType),
+ ProcedureParameter.optional("dry_run", DataTypes.BooleanType),
+ ProcedureParameter.optional("parallelism", DataTypes.IntegerType)
+ };
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("fileName", DataTypes.StringType, false, Metadata.empty())
+ });
+
+ private RemoveUnexistingFilesProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ String tableId = args.getString(0);
+ Preconditions.checkArgument(
+ tableId != null && !tableId.isEmpty(),
+ "Cannot handle an empty tableId for argument %s",
+ tableId);
+ org.apache.paimon.catalog.Identifier identifier =
+ org.apache.paimon.catalog.Identifier.fromString(
+ toIdentifier(args.getString(0), PARAMETERS[0].name()).toString());
+ LOG.info("identifier is {}.", identifier);
+
+ String[] result =
+ SparkRemoveUnexistingFiles.execute(
+ ((WithPaimonCatalog) tableCatalog()).paimonCatalog(),
+ identifier.getDatabaseName(),
+ identifier.getTableName(),
+ !args.isNullAt(1) && args.getBoolean(1),
+ args.isNullAt(2) ? null : args.getInt(2));
+ return Arrays.stream(result)
+ .map(path -> newInternalRow(UTF8String.fromString(path)))
+ .toArray(InternalRow[]::new);
+ }
+
+ public static ProcedureBuilder builder() {
+ return new BaseProcedure.Builder() {
+ @Override
+ public RemoveUnexistingFilesProcedure doBuild() {
+ return new RemoveUnexistingFilesProcedure(tableCatalog());
+ }
+ };
+ }
+
+ @Override
+ public String description() {
+ return "RemoveUnexistingFilesProcedure";
+ }
+}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
similarity index 99%
rename from paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala
rename to paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
index 16b896937961..328a11c01742 100644
--- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.spark.orphan
+package org.apache.paimon.spark.procedure
import org.apache.paimon.{utils, Snapshot}
import org.apache.paimon.catalog.{Catalog, Identifier}
diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala
new file mode 100644
index 000000000000..d37099388238
--- /dev/null
+++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.catalog.{Catalog, Identifier}
+import org.apache.paimon.data.BinaryRow
+import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement}
+import org.apache.paimon.memory.MemorySegment
+import org.apache.paimon.operation.ListUnexistingFiles
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl, CommitMessageSerializer}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.SQLConfHelper
+
+import java.util
+import java.util.{Collections, UUID}
+
+import scala.collection.JavaConverters._
+
+case class SparkRemoveUnexistingFiles(
+ table: FileStoreTable,
+ dryRun: Boolean,
+ parallelism: Int,
+ @transient spark: SparkSession)
+ extends SQLConfHelper
+ with Logging {
+
+ private def buildRDD() = {
+ val binaryPartitions = table.newScan().listPartitions()
+ val realParallelism = Math.min(binaryPartitions.size(), parallelism)
+
+ val numPartitionFields = table.schema().partitionKeys().size()
+ val pathAndMessage = spark.sparkContext
+ .parallelize(
+ binaryPartitions.asScala.map(partition => partition.toBytes).toSeq,
+ realParallelism)
+ .mapPartitions {
+ iter =>
+ {
+ val reuse = new BinaryRow(numPartitionFields)
+ val operation = new ListUnexistingFiles(table)
+ val serializer = new CommitMessageSerializer()
+ iter.flatMap(
+ partitionBytes => {
+ reuse.pointTo(MemorySegment.wrap(partitionBytes), 0, partitionBytes.length)
+ operation.list(reuse).asScala.map {
+ case (bucket, metaMap) =>
+ val message = new CommitMessageImpl(
+ reuse,
+ bucket,
+ new DataIncrement(
+ Collections.emptyList(),
+ new util.ArrayList[DataFileMeta](metaMap.values()),
+ Collections.emptyList()),
+ CompactIncrement.emptyIncrement())
+ (metaMap.keySet().asScala.toSeq, serializer.serialize(message))
+ }
+ })
+ }
+ }
+ .repartition(1)
+ .cache()
+
+ if (!dryRun) {
+ pathAndMessage.foreachPartition {
+ iter =>
+ {
+ val serializer = new CommitMessageSerializer()
+ val messages = new util.ArrayList[CommitMessage]()
+ iter.foreach {
+ case (_, bytes) => messages.add(serializer.deserialize(serializer.getVersion, bytes))
+ }
+ val commit = table.newCommit(UUID.randomUUID().toString)
+ commit.commit(Long.MaxValue, messages)
+ }
+ }
+ }
+
+ pathAndMessage.mapPartitions(
+ iter => {
+ iter.flatMap { case (paths, _) => paths }
+ })
+ }
+}
+
+object SparkRemoveUnexistingFiles extends SQLConfHelper {
+
+ def execute(
+ catalog: Catalog,
+ databaseName: String,
+ tableName: String,
+ dryRun: Boolean,
+ parallelismOpt: Integer): Array[String] = {
+ val spark = SparkSession.active
+ val parallelism = if (parallelismOpt == null) {
+ Math.max(spark.sparkContext.defaultParallelism, conf.numShufflePartitions)
+ } else {
+ parallelismOpt.intValue()
+ }
+
+ val identifier = new Identifier(databaseName, tableName)
+ val table = catalog.getTable(identifier)
+ assert(
+ table.isInstanceOf[FileStoreTable],
+ s"Only FileStoreTable supports remove-unexsiting-files action. The table type is '${table.getClass.getName}'.")
+ val fileStoreTable = table.asInstanceOf[FileStoreTable]
+ SparkRemoveUnexistingFiles(fileStoreTable, dryRun, parallelism, spark).buildRDD().collect()
+ }
+}
diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveUnexistingFilesProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveUnexistingFilesProcedureTest.scala
new file mode 100644
index 000000000000..4b327ad6c9bd
--- /dev/null
+++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveUnexistingFilesProcedureTest.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.procedure
+
+import org.apache.paimon.operation.ListUnexistingFilesTest
+import org.apache.paimon.spark.PaimonSparkTestBase
+
+import java.util.UUID
+
+class RemoveUnexistingFilesProcedureTest extends PaimonSparkTestBase {
+
+ test("Paimon procedure: remove unexisting files, bucket = -1") {
+ testImpl(-1)
+ }
+
+ test("Paimon procedure: remove unexisting files, bucket = 3") {
+ testImpl(3)
+ }
+
+ private def testImpl(bucket: Int): Unit = {
+ val warehouse = tempDBDir.getCanonicalPath
+
+ val numPartitions = 2
+ val numFiles = 10
+ val numDeletes = new Array[Int](numPartitions)
+ val tableName = "t_" + UUID.randomUUID().toString.replace("-", "_")
+ ListUnexistingFilesTest.prepareRandomlyDeletedTable(
+ warehouse,
+ "mydb",
+ tableName,
+ bucket,
+ numFiles,
+ numDeletes)
+
+ val actual = new Array[Int](numPartitions)
+ val pattern = "pt=(\\d+?)/".r
+ spark.sql(s"USE mydb")
+ spark
+ .sql(s"CALL sys.remove_unexisting_files(table => '$tableName', dry_run => true)")
+ .collect()
+ .foreach(
+ r => {
+ pattern.findFirstMatchIn(r.getString(0)) match {
+ case Some(m) => actual(m.group(1).toInt) += 1
+ }
+ })
+ assert(actual.toSeq == numDeletes.toSeq)
+
+ spark.sql(s"CALL sys.remove_unexisting_files(table => '$tableName')")
+ spark
+ .sql(s"SELECT pt, CAST(COUNT(*) AS INT) FROM $tableName GROUP BY pt")
+ .collect()
+ .foreach(
+ r => {
+ assert(r.getInt(1) == numFiles - numDeletes(r.getInt(0)))
+ })
+ }
+}
diff --git a/paimon-spark/pom.xml b/paimon-spark/pom.xml
index 64c966bd2e21..93c742b15af4 100644
--- a/paimon-spark/pom.xml
+++ b/paimon-spark/pom.xml
@@ -235,6 +235,14 @@ under the License.
test
+
+ org.apache.paimon
+ paimon-core
+ ${project.version}
+ test
+ test-jar
+
+
org.apache.paimon
paimon-hive-common
|