From 67bceb69648581db72de30d4a139479b77be1e36 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Thu, 26 Dec 2024 14:41:42 +0800 Subject: [PATCH 1/8] [flink] Add a action/procedure to remove unexisting files from manifests --- docs/content/flink/procedures.md | 29 ++ .../paimon/operation/ListUnexistingFiles.java | 68 +++++ .../operation/ListUnexistingFilesTest.java | 166 +++++++++++ .../action/RemoveUnexistingFilesAction.java | 272 ++++++++++++++++++ .../RemoveUnexistingFilesActionFactory.java | 60 ++++ .../RemoveUnexistingFilesProcedure.java | 107 +++++++ .../org.apache.paimon.factories.Factory | 2 + .../RemoveUnexistingFilesActionITCase.java | 109 +++++++ .../RemoveUnexistingFilesProcedureITCase.java | 93 ++++++ 9 files changed, 906 insertions(+) create mode 100644 paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java create mode 100644 paimon-core/src/test/java/org/apache/paimon/operation/ListUnexistingFilesTest.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionFactory.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedure.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionITCase.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedureITCase.java diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md index e55e41e3d92e..933190b8fb03 100644 --- a/docs/content/flink/procedures.md +++ b/docs/content/flink/procedures.md @@ -344,6 +344,35 @@ All available procedures are listed below. CALL remove_orphan_files(`table` => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => false, parallelism => '5', mode => 'local') + + remove_unexisting_files + + -- Use named argument
+ CALL [catalog.]sys.remove_unexisting_files(`table` => 'identifier', partitions => 'partitions', dry_run => 'dryRun', parallelism => 'parallelism')

+ -- Use indexed argument
+ CALL [catalog.]sys.remove_unexisting_files('identifier')

+ CALL [catalog.]sys.remove_unexisting_files('identifier', 'partitions', 'dryRun', 'parallelism') + + + Procedure to remove unexisting data files from manifest entries. See Java docs for detailed use cases. Arguments: +
  • identifier: the target table identifier. Cannot be empty, you can use database_name.* to clean whole database.
  • +
  • partitions (optional): reomve unexisting files from what partitions, if not set this procedure will remove files in all partitions.
  • +
  • dryRun (optional): only check what files will be removed, but not really remove them. Default is false.
  • +
  • parallelism (optional): number of parallelisms to check files in the manifests.
  • +
    + Note that user is on his own risk using this procedure, which may cause data loss when used outside from the use cases listed in Java docs. + + + -- remove unexisting data files in all partitions of the table `mydb.myt` + CALL sys.remove_unexisting_files(`table` => 'mydb.myt') +
    + -- remove unexisting data files only in partitions `pt = 0` and `pt = 1` of the table `mydb.myt` + CALL sys.remove_unexisting_files(`table` => 'mydb.myt', 'partitions' => 'pt=0;pt=1') +
    + -- only check what files will be removed, but not really remove them (dry run) + CALL sys.remove_unexisting_files(`table` => 'mydb.myt', `dry_run` = true) + + reset_consumer diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java b/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java new file mode 100644 index 000000000000..a22b65e118e5 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.utils.FileStorePathFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** List what data files recorded in manifests are missing from the filesystem. */ +public class ListUnexistingFiles { + + private final FileStoreTable table; + + public ListUnexistingFiles(FileStoreTable table) { + this.table = table; + } + + public Map> list(BinaryRow partition) throws Exception { + FileIO fileIO = table.fileIO(); + FileStorePathFactory pathFactory = table.store().pathFactory(); + Map> result = new HashMap<>(); + List splits = + table.newScan() + .withPartitionFilter(Collections.singletonList(partition)) + .plan() + .splits(); + for (Split split : splits) { + DataSplit dataSplit = (DataSplit) split; + DataFilePathFactory dataFilePathFactory = + pathFactory.createDataFilePathFactory(partition, dataSplit.bucket()); + for (DataFileMeta meta : dataSplit.dataFiles()) { + Path path = dataFilePathFactory.toPath(meta); + if (!fileIO.exists(path)) { + result.computeIfAbsent(dataSplit.bucket(), k -> new HashMap<>()) + .put(path.toString(), meta); + } + } + } + return result; + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ListUnexistingFilesTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ListUnexistingFilesTest.java new file mode 100644 index 000000000000..0ecad4120d9b --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ListUnexistingFilesTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.CoreOptions; +import org.apache.paimon.catalog.FileSystemCatalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.BinaryRowWriter; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.fs.FileStatus; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.schema.Schema; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.table.sink.TableWriteImpl; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; + +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link ListUnexistingFiles}. */ +public class ListUnexistingFilesTest { + + @TempDir java.nio.file.Path tempDir; + + @ParameterizedTest + @ValueSource(ints = {-1, 3}) + public void testListFiles(int bucket) throws Exception { + int numPartitions = 2; + int numFiles = 10; + int[] numDeletes = new int[numPartitions]; + FileStoreTable table = + prepareRandomlyDeletedTable(tempDir.toString(), bucket, numFiles, numDeletes); + + Function binaryRow = + i -> { + BinaryRow b = new BinaryRow(1); + BinaryRowWriter writer = new BinaryRowWriter(b); + writer.writeInt(0, i); + writer.complete(); + return b; + }; + + ListUnexistingFiles operation = new ListUnexistingFiles(table); + for (int i = 0; i < numPartitions; i++) { + Map> result = operation.list(binaryRow.apply(i)); + assertThat(result.values().stream().mapToInt(Map::size).sum()).isEqualTo(numDeletes[i]); + } + } + + public static FileStoreTable prepareRandomlyDeletedTable( + String warehouse, int bucket, int numFiles, int[] numDeletes) throws Exception { + RowType rowType = + RowType.of( + new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()}, + new String[] {"pt", "id", "v"}); + Map options = new HashMap<>(); + options.put(CoreOptions.BUCKET.key(), String.valueOf(bucket)); + options.put(CoreOptions.WRITE_ONLY.key(), "true"); + if (bucket > 0) { + options.put(CoreOptions.BUCKET_KEY.key(), "id"); + } + FileStoreTable table = + createPaimonTable(warehouse, rowType, Collections.singletonList("pt"), options); + + String commitUser = UUID.randomUUID().toString(); + TableWriteImpl write = table.newWrite(commitUser); + TableCommitImpl commit = table.newCommit(commitUser); + + ThreadLocalRandom random = ThreadLocalRandom.current(); + int numPartitions = numDeletes.length; + for (int i = 0; i < numPartitions; i++) { + numDeletes[i] = random.nextInt(0, numFiles + 1); + } + + int identifier = 0; + for (int i = 0; i < numPartitions; i++) { + for (int j = 0; j < numFiles; j++) { + write.write(GenericRow.of(i, random.nextInt(), random.nextLong())); + identifier++; + commit.commit(identifier, write.prepareCommit(false, identifier)); + } + } + + write.close(); + commit.close(); + + for (int i = 0; i < numPartitions; i++) { + LocalFileIO fileIO = LocalFileIO.create(); + List paths = new ArrayList<>(); + for (int j = 0; j < Math.max(1, bucket); j++) { + Path path = new Path(table.location(), "pt=" + i + "/bucket-" + j); + paths.addAll( + Arrays.stream(fileIO.listStatus(path)) + .map(FileStatus::getPath) + .collect(Collectors.toList())); + } + Collections.shuffle(paths); + for (int j = 0; j < numDeletes[i]; j++) { + fileIO.deleteQuietly(paths.get(j)); + } + } + + return table; + } + + private static FileStoreTable createPaimonTable( + String warehouse, + RowType rowType, + List partitionKeys, + Map customOptions) + throws Exception { + LocalFileIO fileIO = LocalFileIO.create(); + Path path = new Path(warehouse); + + Schema schema = + new Schema( + rowType.getFields(), + partitionKeys, + Collections.emptyList(), + customOptions, + ""); + + try (FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, path)) { + paimonCatalog.createDatabase("mydb", false); + Identifier paimonIdentifier = Identifier.create("mydb", "t"); + paimonCatalog.createTable(paimonIdentifier, schema, false); + return (FileStoreTable) paimonCatalog.getTable(paimonIdentifier); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java new file mode 100644 index 000000000000..615f4f88ec38 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.flink.sink.CommittableTypeInfo; +import org.apache.paimon.flink.utils.BoundedOneInputOperator; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.memory.MemorySegment; +import org.apache.paimon.operation.ListUnexistingFiles; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.table.sink.TableCommitImpl; +import org.apache.paimon.utils.Preconditions; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.OutputTag; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +/** + * Action to remove unexisting data files from manifest entries. It has the following use cases: + * + *
      + *
    • There is currently a known case when unexisting data file might be written into manifest. + * Consider a write-only job (W) plus a dedicated compaction job (C): + *
        + *
      1. W commits a snapshot with file F. Then W constantly fails and restarts, each time + * before we can retry the commit (or W is stopped, creating Flink savepoint S). + *
      2. C compacts F into a larger file, so F is now deleted from the manifest. + *
      3. Before the compact snapshot expires, but after all snapshots created by W expires, W + * comes back to normal. As W cannot find its previous snapshot, it assumes that this + * snapshot has not been committed (see {@link + * org.apache.paimon.operation.FileStoreCommitImpl#filterCommitted} for more detail), so + * file F is committed to the manifest once again. + *
      4. When the compact snapshot expires, file F will be deleted from the file system. Now F + * is in the manifest, but not on the file system. With this situation, user might want + * to remove F from the manifest to continue reading the table. + *
      + *
    • User deletes a data file by mistake (for example, by incorrectly setting the time threshold + * for orphan files cleaning). If the user can tolerate skipping some records when consuming + * this table, he can also use this action to remove the file from manifest. + *
    + * + *

    Note that user is on his own risk using this procedure, which may cause data loss when used + * outside from the use cases above. + */ +public class RemoveUnexistingFilesAction extends TableActionBase { + + private static final OutputTag RESULT_SIDE_OUTPUT = + new OutputTag<>("result-side-output", BasicTypeInfo.STRING_TYPE_INFO); + + @Nullable private List> partitions = null; + private boolean dryRun = false; + @Nullable private Integer parallelism = null; + + public RemoveUnexistingFilesAction( + String databaseName, String tableName, Map catalogConfig) { + super(databaseName, tableName, catalogConfig); + } + + public RemoveUnexistingFilesAction withPartitions(List> partitions) { + this.partitions = partitions; + return this; + } + + public RemoveUnexistingFilesAction dryRun() { + this.dryRun = true; + return this; + } + + public RemoveUnexistingFilesAction withParallelism(int parallelism) { + this.parallelism = parallelism; + return this; + } + + @Override + public void build() throws Exception { + buildDataStream(); + } + + public DataStream buildDataStream() throws Exception { + FileStoreTable fileStoreTable = (FileStoreTable) table; + List binaryPartitions; + if (partitions == null) { + binaryPartitions = ((FileStoreTable) table).newScan().listPartitions(); + } else { + binaryPartitions = + PartitionPredicate.createBinaryPartitions( + partitions, + fileStoreTable.schema().logicalPartitionType(), + fileStoreTable.coreOptions().partitionDefaultName()); + } + + SingleOutputStreamOperator source = + env.fromData( + binaryPartitions.stream() + .map(BinaryRow::toBytes) + .collect(Collectors.toList()), + PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO) + .name("Remove Unexisting Files Source") + .forceNonParallel(); + + SingleOutputStreamOperator worker = + source.transform( + "Remove Unexisting Files Worker", + new CommittableTypeInfo(), + new WorkerOperator(fileStoreTable)); + if (parallelism != null) { + worker = worker.setParallelism(parallelism); + } + + DataStream result = worker.getSideOutput(RESULT_SIDE_OUTPUT); + if (dryRun) { + return result; + } + + worker.transform( + "Global Committer : " + table.name(), + new CommittableTypeInfo(), + new CommitOperator(fileStoreTable)) + .forceNonParallel(); + return result; + } + + @Override + public void run() throws Exception { + build(); + env.execute("Remove Unexisting Files : " + table.name()); + } + + private static class WorkerOperator extends BoundedOneInputOperator { + + private static final long serialVersionUID = 1L; + + private final FileStoreTable table; + + private transient ListUnexistingFiles operation; + private transient BinaryRow reuse; + + private WorkerOperator(FileStoreTable table) { + this.table = table; + } + + @Override + public void open() throws Exception { + operation = new ListUnexistingFiles(table); + reuse = new BinaryRow(table.schema().partitionKeys().size()); + } + + @Override + public void processElement(StreamRecord record) throws Exception { + byte[] bytes = record.getValue(); + reuse.pointTo(MemorySegment.wrap(bytes), 0, bytes.length); + Map> toDelete = operation.list(reuse); + for (Map.Entry> entry : toDelete.entrySet()) { + CommitMessageImpl message = + new CommitMessageImpl( + reuse, + entry.getKey(), + new DataIncrement( + Collections.emptyList(), + new ArrayList<>(entry.getValue().values()), + Collections.emptyList()), + CompactIncrement.emptyIncrement()); + output.collect( + new StreamRecord<>( + new Committable(Long.MAX_VALUE, Committable.Kind.FILE, message))); + for (String path : entry.getValue().keySet()) { + output.collect(RESULT_SIDE_OUTPUT, new StreamRecord<>(path)); + } + } + } + + @Override + public void endInput() throws Exception {} + } + + private static class CommitOperator extends BoundedOneInputOperator { + + private static final long serialVersionUID = 1L; + + private final FileStoreTable table; + + private transient List commitMessages; + private transient TableCommitImpl commit; + + private CommitOperator(FileStoreTable table) { + this.table = table; + } + + @Override + public void open() throws Exception { + commitMessages = new ArrayList<>(); + commit = table.newCommit(UUID.randomUUID().toString()); + } + + @Override + public void processElement(StreamRecord record) throws Exception { + Committable committable = record.getValue(); + Preconditions.checkArgument( + committable.kind() == Committable.Kind.FILE, + "Committable has kind " + committable.kind() + ". This is unexpected!"); + commitMessages.add((CommitMessage) committable.wrappedCommittable()); + } + + @Override + public void endInput() throws Exception { + try { + commit.commit(Long.MAX_VALUE, commitMessages); + } catch (Exception e) { + // For batch jobs we don't know if this commit is new or being + // retried, so in theory we need to call filterAndCommit. + // + // However on the happy path, filtering takes time because there + // is no previous commit of this user, and the filtering process + // must go through all existing snapshots to determine this. + // + // So instead, we ask the user to retry this job if the commit + // failed, most probably due to a conflict. Why not throw this + // exception? Because throwing the exception will restart the + // job, if it is a batch job we'll have to filter the commit. + // + // Retrying this job will calculate what file entries to remove + // again, so there is no harm. + LOG.warn( + "Commit failed due to exception. " + + "Consider running this action or procedure again.", + e); + } + } + + @Override + public void close() throws Exception { + if (commit != null) { + commit.close(); + } + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionFactory.java new file mode 100644 index 000000000000..e1cbfe18cc2a --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionFactory.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import java.util.Optional; + +/** Factory to create {@link RemoveUnexistingFilesAction}. */ +public class RemoveUnexistingFilesActionFactory implements ActionFactory { + + public static final String IDENTIFIER = "remove_unexisting_files"; + private static final String DRY_RUN = "dry_run"; + private static final String PARALLELISM = "parallelism"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Optional create(MultipleParameterToolAdapter params) { + RemoveUnexistingFilesAction action = + new RemoveUnexistingFilesAction( + params.getRequired(DATABASE), + params.getRequired(TABLE), + catalogConfigMap(params)); + + if (params.has(PARTITION)) { + action.withPartitions(getPartitions(params)); + } + + if (params.has(DRY_RUN) && Boolean.parseBoolean(params.get(DRY_RUN))) { + action.dryRun(); + } + + if (params.has(PARALLELISM)) { + action.withParallelism(Integer.parseInt(params.get(PARALLELISM))); + } + + return Optional.of(action); + } + + @Override + public void printHelp() {} +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedure.java new file mode 100644 index 000000000000..7a4f9149bfef --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedure.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.action.RemoveUnexistingFilesAction; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; +import org.apache.flink.util.CloseableIterator; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.List; + +import static org.apache.paimon.utils.ParameterUtils.getPartitions; +import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly; + +/** + * Procedure to remove unexisting data files from manifest entries. See {@link + * RemoveUnexistingFilesAction} for detailed use cases. + * + *

    
    + *  -- remove unexisting data files in all partitions of the table `mydb.myt`
    + *  CALL sys.remove_unexisting_files(`table` => 'mydb.myt')
    + *
    + *  -- remove unexisting data files only in partitions `pt = 0` and `pt = 1` of the table `mydb.myt`
    + *  CALL sys.remove_unexisting_files(`table` => 'mydb.myt', 'partitions' => 'pt=0;pt=1')
    + *
    + *  -- only check what files will be removed, but not really remove them (dry run)
    + *  CALL sys.remove_unexisting_files(`table` => 'mydb.myt', `dry_run` = true)
    + * 
    + * + *

    Note that user is on his own risk using this procedure, which may cause data loss when used + * outside from the use cases above. + */ +public class RemoveUnexistingFilesProcedure extends ProcedureBase { + + public static final String IDENTIFIER = "remove_unexisting_files"; + + @ProcedureHint( + argument = { + @ArgumentHint(name = "table", type = @DataTypeHint("STRING")), + @ArgumentHint( + name = "partitions", + type = @DataTypeHint("STRING"), + isOptional = true), + @ArgumentHint(name = "dry_run", type = @DataTypeHint("BOOLEAN"), isOptional = true), + @ArgumentHint(name = "parallelism", type = @DataTypeHint("INT"), isOptional = true) + }) + public String[] call( + ProcedureContext procedureContext, + String tableId, + @Nullable String partitions, + @Nullable Boolean dryRun, + @Nullable Integer parallelism) + throws Exception { + Identifier identifier = Identifier.fromString(tableId); + String databaseName = identifier.getDatabaseName(); + String tableName = identifier.getObjectName(); + + RemoveUnexistingFilesAction action = + new RemoveUnexistingFilesAction(databaseName, tableName, catalog.options()); + if (!(isNullOrWhitespaceOnly(partitions))) { + action.withPartitions(getPartitions(partitions.split(";"))); + } + if (Boolean.TRUE.equals(dryRun)) { + action.dryRun(); + } + if (parallelism != null) { + action.withParallelism(parallelism); + } + action.withStreamExecutionEnvironment(procedureContext.getExecutionEnvironment()); + + List result = new ArrayList<>(); + try (CloseableIterator it = + action.buildDataStream() + .executeAndCollect("Remove Unexisting Files : " + tableName)) { + it.forEachRemaining(result::add); + } + return result.toArray(new String[0]); + } + + @Override + public String identifier() { + return IDENTIFIER; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 6251189560f6..db932bb38f9c 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -43,6 +43,7 @@ org.apache.paimon.flink.action.RenameTagActionFactory org.apache.paimon.flink.action.RepairActionFactory org.apache.paimon.flink.action.RewriteFileIndexActionFactory org.apache.paimon.flink.action.ExpireSnapshotsActionFactory +org.apache.paimon.flink.action.RemoveUnexistingFilesActionFactory ### procedure factories org.apache.paimon.flink.procedure.CompactDatabaseProcedure @@ -82,3 +83,4 @@ org.apache.paimon.flink.procedure.MarkPartitionDoneProcedure org.apache.paimon.flink.procedure.CloneProcedure org.apache.paimon.flink.procedure.CompactManifestProcedure org.apache.paimon.flink.procedure.RefreshObjectTableProcedure +org.apache.paimon.flink.procedure.RemoveUnexistingFilesProcedure diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionITCase.java new file mode 100644 index 000000000000..307007fd913d --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionITCase.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.operation.ListUnexistingFilesTest; + +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.function.Function; + +import static org.assertj.core.api.Assertions.assertThat; + +/** IT cases for {@link RemoveUnexistingFilesAction}. */ +public class RemoveUnexistingFilesActionITCase extends ActionITCaseBase { + + @ParameterizedTest + @ValueSource(ints = {-1, 3}) + public void testAction(int bucket) throws Exception { + int numPartitions = 2; + int numFiles = 10; + int[] numDeletes = new int[numPartitions]; + ListUnexistingFilesTest.prepareRandomlyDeletedTable( + warehouse, bucket, numFiles, numDeletes); + + Function runAction = + action -> { + int cnt = 0; + try (CloseableIterator it = + action.buildDataStream().executeAndCollect()) { + while (it.hasNext()) { + cnt++; + it.next(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return cnt; + }; + + for (int i = 0; i < numPartitions; i++) { + RemoveUnexistingFilesAction action = + createAction( + RemoveUnexistingFilesAction.class, + "remove_unexisting_files", + "--warehouse", + warehouse, + "--database", + "mydb", + "--table", + "t", + "--partition", + "pt=" + i, + "--dry_run", + "true"); + assertThat(runAction.apply(action)).isEqualTo(numDeletes[i]); + } + + RemoveUnexistingFilesAction action = + createAction( + RemoveUnexistingFilesAction.class, + "remove_unexisting_files", + "--warehouse", + warehouse, + "--database", + "mydb", + "--table", + "t"); + assertThat(runAction.apply(action)).isEqualTo(Arrays.stream(numDeletes).sum()); + + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + tEnv.executeSql( + "CREATE CATALOG mycat WITH (\n" + + " 'type' = 'paimon',\n" + + " 'warehouse' = '" + + warehouse + + "'\n" + + ")"); + tEnv.executeSql("USE CATALOG mycat"); + try (CloseableIterator it = + tEnv.executeSql("SELECT pt, CAST(COUNT(*) AS INT) FROM mydb.t GROUP BY pt") + .collect()) { + while (it.hasNext()) { + Row row = it.next(); + assertThat(row.getField(1)).isEqualTo(numFiles - numDeletes[(int) row.getField(0)]); + } + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedureITCase.java new file mode 100644 index 000000000000..fad002b1fdb8 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedureITCase.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.flink.util.AbstractTestBase; +import org.apache.paimon.operation.ListUnexistingFilesTest; + +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.types.Row; +import org.apache.flink.util.CloseableIterator; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.function.Function; + +import static org.assertj.core.api.Assertions.assertThat; + +/** IT cases for {@link RemoveUnexistingFilesProcedure}. */ +public class RemoveUnexistingFilesProcedureITCase extends AbstractTestBase { + + @ParameterizedTest + @ValueSource(ints = {-1, 3}) + public void testProcedure(int bucket) throws Exception { + String warehouse = getTempDirPath(); + int numPartitions = 2; + int numFiles = 10; + int[] numDeletes = new int[numPartitions]; + ListUnexistingFilesTest.prepareRandomlyDeletedTable( + warehouse, bucket, numFiles, numDeletes); + + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + tEnv.executeSql( + "CREATE CATALOG mycat WITH (\n" + + " 'type' = 'paimon',\n" + + " 'warehouse' = '" + + warehouse + + "'\n" + + ")"); + tEnv.executeSql("USE CATALOG mycat"); + + Function runProcedure = + sql -> { + int cnt = 0; + try (CloseableIterator it = tEnv.executeSql(sql).collect()) { + while (it.hasNext()) { + cnt++; + it.next(); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + return cnt; + }; + + for (int i = 0; i < numPartitions; i++) { + assertThat( + runProcedure.apply( + "CALL sys.remove_unexisting_files(`table` => 'mydb.t', `partitions` => 'pt = " + + i + + "', `dry_run` => true)")) + .isEqualTo(numDeletes[i]); + } + + assertThat(runProcedure.apply("CALL sys.remove_unexisting_files(`table` => 'mydb.t')")) + .isEqualTo(Arrays.stream(numDeletes).sum()); + + try (CloseableIterator it = + tEnv.executeSql("SELECT pt, CAST(COUNT(*) AS INT) FROM mydb.t GROUP BY pt") + .collect()) { + while (it.hasNext()) { + Row row = it.next(); + assertThat(row.getField(1)).isEqualTo(numFiles - numDeletes[(int) row.getField(0)]); + } + } + } +} From 21dc5ffc47aa5aa274931e03db16a84ffe2570b9 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Thu, 26 Dec 2024 15:12:00 +0800 Subject: [PATCH 2/8] [fix] Add more comments --- .../paimon/flink/action/RemoveUnexistingFilesAction.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java index 615f4f88ec38..1f6a5fda2081 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java @@ -61,8 +61,8 @@ * before we can retry the commit (or W is stopped, creating Flink savepoint S). *

  • C compacts F into a larger file, so F is now deleted from the manifest. *
  • Before the compact snapshot expires, but after all snapshots created by W expires, W - * comes back to normal. As W cannot find its previous snapshot, it assumes that this - * snapshot has not been committed (see {@link + * comes back to normal (or restarted from savepoint S). As W cannot find its previous + * snapshot, it assumes that this snapshot has not been committed (see {@link * org.apache.paimon.operation.FileStoreCommitImpl#filterCommitted} for more detail), so * file F is committed to the manifest once again. *
  • When the compact snapshot expires, file F will be deleted from the file system. Now F From f0b4b64b714d920db47ae7b3b417a884eaa5fe2f Mon Sep 17 00:00:00 2001 From: tsreaper Date: Fri, 27 Dec 2024 15:01:41 +0800 Subject: [PATCH 3/8] [fix] Add printHelp for action --- .../RemoveUnexistingFilesActionFactory.java | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionFactory.java index e1cbfe18cc2a..d082772e4cec 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionFactory.java @@ -56,5 +56,24 @@ public Optional create(MultipleParameterToolAdapter params) { } @Override - public void printHelp() {} + public void printHelp() { + System.out.println( + "Action \"remove_unexisting_files\" removes unexisting data files from manifest entries."); + System.out.println( + "See Java docs in https://paimon.apache.org/docs/master/api/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.html for detailed use cases."); + System.out.println( + "Note that user is on his own risk using this procedure, which may cause data loss when used outside from the use cases in Java docs."); + System.out.println(); + + System.out.println("Syntax:"); + System.out.println( + " remove_unexisting_files --warehouse --database " + + "--table [--partition [--partition ]] " + + "[--dry_run ] " + + "[--parallelism ]"); + System.out.println( + "If partitions are not specified, this action will remove unexisting files from all partitions."); + System.out.println( + "When dry_run is set to true (default false), this action only checks what files will be removed, but not really remove them."); + } } From 056554e6b91ee908714258766f06e3a6fbcea446 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Mon, 6 Jan 2025 14:15:24 +0800 Subject: [PATCH 4/8] [fix] Remove partition option and add thread pool for checking files --- docs/content/flink/procedures.md | 10 +-- .../paimon/operation/ListUnexistingFiles.java | 41 +++++++--- .../action/RemoveUnexistingFilesAction.java | 18 +---- .../RemoveUnexistingFilesActionFactory.java | 4 - .../RemoveUnexistingFilesProcedure.java | 16 +--- .../RemoveUnexistingFilesActionITCase.java | 81 +++++++++---------- .../RemoveUnexistingFilesProcedureITCase.java | 44 +++++----- 7 files changed, 89 insertions(+), 125 deletions(-) diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md index 933190b8fb03..4d1ad27fbdcf 100644 --- a/docs/content/flink/procedures.md +++ b/docs/content/flink/procedures.md @@ -348,27 +348,23 @@ All available procedures are listed below. remove_unexisting_files -- Use named argument
    - CALL [catalog.]sys.remove_unexisting_files(`table` => 'identifier', partitions => 'partitions', dry_run => 'dryRun', parallelism => 'parallelism')

    + CALL [catalog.]sys.remove_unexisting_files(`table` => 'identifier', dry_run => 'dryRun', parallelism => 'parallelism')

    -- Use indexed argument
    CALL [catalog.]sys.remove_unexisting_files('identifier')

    - CALL [catalog.]sys.remove_unexisting_files('identifier', 'partitions', 'dryRun', 'parallelism') + CALL [catalog.]sys.remove_unexisting_files('identifier', 'dryRun', 'parallelism') Procedure to remove unexisting data files from manifest entries. See Java docs for detailed use cases. Arguments:
  • identifier: the target table identifier. Cannot be empty, you can use database_name.* to clean whole database.
  • -
  • partitions (optional): reomve unexisting files from what partitions, if not set this procedure will remove files in all partitions.
  • dryRun (optional): only check what files will be removed, but not really remove them. Default is false.
  • parallelism (optional): number of parallelisms to check files in the manifests.

  • Note that user is on his own risk using this procedure, which may cause data loss when used outside from the use cases listed in Java docs. - -- remove unexisting data files in all partitions of the table `mydb.myt` + -- remove unexisting data files in the table `mydb.myt` CALL sys.remove_unexisting_files(`table` => 'mydb.myt')
    - -- remove unexisting data files only in partitions `pt = 0` and `pt = 1` of the table `mydb.myt` - CALL sys.remove_unexisting_files(`table` => 'mydb.myt', 'partitions' => 'pt=0;pt=1') -
    -- only check what files will be removed, but not really remove them (dry run) CALL sys.remove_unexisting_files(`table` => 'mydb.myt', `dry_run` = true) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java b/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java index a22b65e118e5..29082ed265e9 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java @@ -19,7 +19,6 @@ package org.apache.paimon.operation; import org.apache.paimon.data.BinaryRow; -import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; @@ -27,42 +26,60 @@ import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.Split; import org.apache.paimon.utils.FileStorePathFactory; +import org.apache.paimon.utils.ThreadPoolUtils; +import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ThreadPoolExecutor; + +import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; /** List what data files recorded in manifests are missing from the filesystem. */ public class ListUnexistingFiles { private final FileStoreTable table; + private final FileStorePathFactory pathFactory; + private final ThreadPoolExecutor executor; public ListUnexistingFiles(FileStoreTable table) { this.table = table; + this.pathFactory = table.store().pathFactory(); + this.executor = + createCachedThreadPool( + table.coreOptions().deleteFileThreadNum(), "LIST_UNEXISTING_FILES"); } public Map> list(BinaryRow partition) throws Exception { - FileIO fileIO = table.fileIO(); - FileStorePathFactory pathFactory = table.store().pathFactory(); - Map> result = new HashMap<>(); + Map> result = new ConcurrentHashMap<>(); List splits = table.newScan() .withPartitionFilter(Collections.singletonList(partition)) .plan() .splits(); - for (Split split : splits) { - DataSplit dataSplit = (DataSplit) split; - DataFilePathFactory dataFilePathFactory = - pathFactory.createDataFilePathFactory(partition, dataSplit.bucket()); - for (DataFileMeta meta : dataSplit.dataFiles()) { - Path path = dataFilePathFactory.toPath(meta); - if (!fileIO.exists(path)) { + ThreadPoolUtils.randomlyOnlyExecute( + executor, split -> listFilesInDataSplit((DataSplit) split, result), splits); + return result; + } + + private void listFilesInDataSplit( + DataSplit dataSplit, Map> result) { + DataFilePathFactory dataFilePathFactory = + pathFactory.createDataFilePathFactory(dataSplit.partition(), dataSplit.bucket()); + for (DataFileMeta meta : dataSplit.dataFiles()) { + Path path = dataFilePathFactory.toPath(meta); + try { + if (!table.fileIO().exists(path)) { result.computeIfAbsent(dataSplit.bucket(), k -> new HashMap<>()) .put(path.toString(), meta); } + } catch (IOException e) { + throw new UncheckedIOException("Cannot determine if file " + path + " exists.", e); } } - return result; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java index 1f6a5fda2081..028a59771d88 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java @@ -27,7 +27,6 @@ import org.apache.paimon.io.DataIncrement; import org.apache.paimon.memory.MemorySegment; import org.apache.paimon.operation.ListUnexistingFiles; -import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; @@ -82,7 +81,6 @@ public class RemoveUnexistingFilesAction extends TableActionBase { private static final OutputTag RESULT_SIDE_OUTPUT = new OutputTag<>("result-side-output", BasicTypeInfo.STRING_TYPE_INFO); - @Nullable private List> partitions = null; private boolean dryRun = false; @Nullable private Integer parallelism = null; @@ -91,11 +89,6 @@ public RemoveUnexistingFilesAction( super(databaseName, tableName, catalogConfig); } - public RemoveUnexistingFilesAction withPartitions(List> partitions) { - this.partitions = partitions; - return this; - } - public RemoveUnexistingFilesAction dryRun() { this.dryRun = true; return this; @@ -113,16 +106,7 @@ public void build() throws Exception { public DataStream buildDataStream() throws Exception { FileStoreTable fileStoreTable = (FileStoreTable) table; - List binaryPartitions; - if (partitions == null) { - binaryPartitions = ((FileStoreTable) table).newScan().listPartitions(); - } else { - binaryPartitions = - PartitionPredicate.createBinaryPartitions( - partitions, - fileStoreTable.schema().logicalPartitionType(), - fileStoreTable.coreOptions().partitionDefaultName()); - } + List binaryPartitions = ((FileStoreTable) table).newScan().listPartitions(); SingleOutputStreamOperator source = env.fromData( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionFactory.java index d082772e4cec..e4ed3bb8101f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionFactory.java @@ -40,10 +40,6 @@ public Optional create(MultipleParameterToolAdapter params) { params.getRequired(TABLE), catalogConfigMap(params)); - if (params.has(PARTITION)) { - action.withPartitions(getPartitions(params)); - } - if (params.has(DRY_RUN) && Boolean.parseBoolean(params.get(DRY_RUN))) { action.dryRun(); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedure.java index 7a4f9149bfef..a1ec5070f364 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedure.java @@ -32,20 +32,14 @@ import java.util.ArrayList; import java.util.List; -import static org.apache.paimon.utils.ParameterUtils.getPartitions; -import static org.apache.paimon.utils.StringUtils.isNullOrWhitespaceOnly; - /** * Procedure to remove unexisting data files from manifest entries. See {@link * RemoveUnexistingFilesAction} for detailed use cases. * *
    
    - *  -- remove unexisting data files in all partitions of the table `mydb.myt`
    + *  -- remove unexisting data files in table `mydb.myt`
      *  CALL sys.remove_unexisting_files(`table` => 'mydb.myt')
      *
    - *  -- remove unexisting data files only in partitions `pt = 0` and `pt = 1` of the table `mydb.myt`
    - *  CALL sys.remove_unexisting_files(`table` => 'mydb.myt', 'partitions' => 'pt=0;pt=1')
    - *
      *  -- only check what files will be removed, but not really remove them (dry run)
      *  CALL sys.remove_unexisting_files(`table` => 'mydb.myt', `dry_run` = true)
      * 
    @@ -60,17 +54,12 @@ public class RemoveUnexistingFilesProcedure extends ProcedureBase { @ProcedureHint( argument = { @ArgumentHint(name = "table", type = @DataTypeHint("STRING")), - @ArgumentHint( - name = "partitions", - type = @DataTypeHint("STRING"), - isOptional = true), @ArgumentHint(name = "dry_run", type = @DataTypeHint("BOOLEAN"), isOptional = true), @ArgumentHint(name = "parallelism", type = @DataTypeHint("INT"), isOptional = true) }) public String[] call( ProcedureContext procedureContext, String tableId, - @Nullable String partitions, @Nullable Boolean dryRun, @Nullable Integer parallelism) throws Exception { @@ -80,9 +69,6 @@ public String[] call( RemoveUnexistingFilesAction action = new RemoveUnexistingFilesAction(databaseName, tableName, catalog.options()); - if (!(isNullOrWhitespaceOnly(partitions))) { - action.withPartitions(getPartitions(partitions.split(";"))); - } if (Boolean.TRUE.equals(dryRun)) { action.dryRun(); } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionITCase.java index 307007fd913d..ef9059008372 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionITCase.java @@ -26,8 +26,8 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import java.util.Arrays; -import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static org.assertj.core.api.Assertions.assertThat; @@ -43,51 +43,44 @@ public void testAction(int bucket) throws Exception { ListUnexistingFilesTest.prepareRandomlyDeletedTable( warehouse, bucket, numFiles, numDeletes); - Function runAction = - action -> { - int cnt = 0; - try (CloseableIterator it = - action.buildDataStream().executeAndCollect()) { - while (it.hasNext()) { - cnt++; - it.next(); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - return cnt; - }; - - for (int i = 0; i < numPartitions; i++) { - RemoveUnexistingFilesAction action = - createAction( - RemoveUnexistingFilesAction.class, - "remove_unexisting_files", - "--warehouse", - warehouse, - "--database", - "mydb", - "--table", - "t", - "--partition", - "pt=" + i, - "--dry_run", - "true"); - assertThat(runAction.apply(action)).isEqualTo(numDeletes[i]); - } - RemoveUnexistingFilesAction action = createAction( - RemoveUnexistingFilesAction.class, - "remove_unexisting_files", - "--warehouse", - warehouse, - "--database", - "mydb", - "--table", - "t"); - assertThat(runAction.apply(action)).isEqualTo(Arrays.stream(numDeletes).sum()); + RemoveUnexistingFilesAction.class, + "remove_unexisting_files", + "--warehouse", + warehouse, + "--database", + "mydb", + "--table", + "t", + "--dry_run", + "true") + .withParallelism(2); + int[] actual = new int[numPartitions]; + Pattern pattern = Pattern.compile("pt=(\\d+?)/"); + try (CloseableIterator it = action.buildDataStream().executeAndCollect()) { + while (it.hasNext()) { + String path = it.next(); + Matcher matcher = pattern.matcher(path); + if (matcher.find()) { + actual[Integer.parseInt(matcher.group(1))]++; + } + } + } + assertThat(actual).isEqualTo(numDeletes); + action = + createAction( + RemoveUnexistingFilesAction.class, + "remove_unexisting_files", + "--warehouse", + warehouse, + "--database", + "mydb", + "--table", + "t") + .withParallelism(2); + action.run(); TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); tEnv.executeSql( "CREATE CATALOG mycat WITH (\n" diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedureITCase.java index fad002b1fdb8..f983784146ed 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedureITCase.java @@ -27,8 +27,8 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; -import java.util.Arrays; -import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static org.assertj.core.api.Assertions.assertThat; @@ -55,32 +55,24 @@ public void testProcedure(int bucket) throws Exception { + ")"); tEnv.executeSql("USE CATALOG mycat"); - Function runProcedure = - sql -> { - int cnt = 0; - try (CloseableIterator it = tEnv.executeSql(sql).collect()) { - while (it.hasNext()) { - cnt++; - it.next(); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - return cnt; - }; - - for (int i = 0; i < numPartitions; i++) { - assertThat( - runProcedure.apply( - "CALL sys.remove_unexisting_files(`table` => 'mydb.t', `partitions` => 'pt = " - + i - + "', `dry_run` => true)")) - .isEqualTo(numDeletes[i]); + int[] actual = new int[numPartitions]; + Pattern pattern = Pattern.compile("pt=(\\d+?)/"); + try (CloseableIterator it = + tEnv.executeSql( + "CALL sys.remove_unexisting_files(`table` => 'mydb.t', `dry_run` => true, `parallelism` => 2)") + .collect()) { + while (it.hasNext()) { + Row row = it.next(); + Matcher matcher = pattern.matcher(row.getField(0).toString()); + if (matcher.find()) { + actual[Integer.parseInt(matcher.group(1))]++; + } + } } + assertThat(actual).isEqualTo(numDeletes); - assertThat(runProcedure.apply("CALL sys.remove_unexisting_files(`table` => 'mydb.t')")) - .isEqualTo(Arrays.stream(numDeletes).sum()); - + tEnv.executeSql("CALL sys.remove_unexisting_files(`table` => 'mydb.t', `parallelism` => 2)") + .await(); try (CloseableIterator it = tEnv.executeSql("SELECT pt, CAST(COUNT(*) AS INT) FROM mydb.t GROUP BY pt") .collect()) { From fa4200b0da9de4cf27a1cb3216984516c3dbac57 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Mon, 13 Jan 2025 14:26:30 +0800 Subject: [PATCH 5/8] [fix] Add spark procedure --- .../paimon/operation/ListUnexistingFiles.java | 35 +++-- .../operation/ListUnexistingFilesTest.java | 25 +++- .../action/RemoveUnexistingFilesAction.java | 4 +- .../RemoveUnexistingFilesActionITCase.java | 2 +- .../RemoveUnexistingFilesProcedureITCase.java | 2 +- .../apache/paimon/spark/SparkProcedures.java | 2 + .../procedure/RemoveOrphanFilesProcedure.java | 1 - .../RemoveUnexistingFilesProcedure.java | 119 +++++++++++++++++ .../SparkOrphanFilesClean.scala | 2 +- .../SparkRemoveUnexistingFiles.scala | 125 ++++++++++++++++++ .../RemoveUnexistingFilesProcedureTest.scala | 74 +++++++++++ paimon-spark/pom.xml | 8 ++ 12 files changed, 380 insertions(+), 19 deletions(-) create mode 100644 paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveUnexistingFilesProcedure.java rename paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/{orphan => procedure}/SparkOrphanFilesClean.scala (99%) create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala create mode 100644 paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveUnexistingFilesProcedureTest.scala diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java b/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java index 29082ed265e9..a704a329c71a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ListUnexistingFiles.java @@ -30,11 +30,12 @@ import java.io.IOException; import java.io.UncheckedIOException; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool; @@ -55,31 +56,49 @@ public ListUnexistingFiles(FileStoreTable table) { } public Map> list(BinaryRow partition) throws Exception { - Map> result = new ConcurrentHashMap<>(); + Map> result = new HashMap<>(); List splits = table.newScan() .withPartitionFilter(Collections.singletonList(partition)) .plan() .splits(); - ThreadPoolUtils.randomlyOnlyExecute( - executor, split -> listFilesInDataSplit((DataSplit) split, result), splits); + Iterator it = + ThreadPoolUtils.randomlyExecuteSequentialReturn( + executor, split -> listFilesInDataSplit((DataSplit) split), splits); + while (it.hasNext()) { + ListResult item = it.next(); + result.computeIfAbsent(item.bucket, k -> new HashMap<>()).put(item.path, item.meta); + } return result; } - private void listFilesInDataSplit( - DataSplit dataSplit, Map> result) { + private List listFilesInDataSplit(DataSplit dataSplit) { + List results = new ArrayList<>(); DataFilePathFactory dataFilePathFactory = pathFactory.createDataFilePathFactory(dataSplit.partition(), dataSplit.bucket()); for (DataFileMeta meta : dataSplit.dataFiles()) { Path path = dataFilePathFactory.toPath(meta); try { if (!table.fileIO().exists(path)) { - result.computeIfAbsent(dataSplit.bucket(), k -> new HashMap<>()) - .put(path.toString(), meta); + results.add(new ListResult(dataSplit.bucket(), path.toString(), meta)); } } catch (IOException e) { throw new UncheckedIOException("Cannot determine if file " + path + " exists.", e); } } + return results; + } + + private static class ListResult { + + private final int bucket; + private final String path; + private final DataFileMeta meta; + + private ListResult(int bucket, String path, DataFileMeta meta) { + this.bucket = bucket; + this.path = path; + this.meta = meta; + } } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ListUnexistingFilesTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ListUnexistingFilesTest.java index 0ecad4120d9b..7936aa517349 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ListUnexistingFilesTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ListUnexistingFilesTest.java @@ -65,7 +65,8 @@ public void testListFiles(int bucket) throws Exception { int numFiles = 10; int[] numDeletes = new int[numPartitions]; FileStoreTable table = - prepareRandomlyDeletedTable(tempDir.toString(), bucket, numFiles, numDeletes); + prepareRandomlyDeletedTable( + tempDir.toString(), "mydb", "t", bucket, numFiles, numDeletes); Function binaryRow = i -> { @@ -84,7 +85,13 @@ public void testListFiles(int bucket) throws Exception { } public static FileStoreTable prepareRandomlyDeletedTable( - String warehouse, int bucket, int numFiles, int[] numDeletes) throws Exception { + String warehouse, + String databaseName, + String tableName, + int bucket, + int numFiles, + int[] numDeletes) + throws Exception { RowType rowType = RowType.of( new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT()}, @@ -96,7 +103,13 @@ public static FileStoreTable prepareRandomlyDeletedTable( options.put(CoreOptions.BUCKET_KEY.key(), "id"); } FileStoreTable table = - createPaimonTable(warehouse, rowType, Collections.singletonList("pt"), options); + createPaimonTable( + warehouse, + databaseName, + tableName, + rowType, + Collections.singletonList("pt"), + options); String commitUser = UUID.randomUUID().toString(); TableWriteImpl write = table.newWrite(commitUser); @@ -141,6 +154,8 @@ public static FileStoreTable prepareRandomlyDeletedTable( private static FileStoreTable createPaimonTable( String warehouse, + String databaseName, + String tableName, RowType rowType, List partitionKeys, Map customOptions) @@ -157,8 +172,8 @@ private static FileStoreTable createPaimonTable( ""); try (FileSystemCatalog paimonCatalog = new FileSystemCatalog(fileIO, path)) { - paimonCatalog.createDatabase("mydb", false); - Identifier paimonIdentifier = Identifier.create("mydb", "t"); + paimonCatalog.createDatabase(databaseName, true); + Identifier paimonIdentifier = Identifier.create(databaseName, tableName); paimonCatalog.createTable(paimonIdentifier, schema, false); return (FileStoreTable) paimonCatalog.getTable(paimonIdentifier); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java index 028a59771d88..8d014d66ee02 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RemoveUnexistingFilesAction.java @@ -106,7 +106,7 @@ public void build() throws Exception { public DataStream buildDataStream() throws Exception { FileStoreTable fileStoreTable = (FileStoreTable) table; - List binaryPartitions = ((FileStoreTable) table).newScan().listPartitions(); + List binaryPartitions = fileStoreTable.newScan().listPartitions(); SingleOutputStreamOperator source = env.fromData( @@ -123,7 +123,7 @@ public DataStream buildDataStream() throws Exception { new CommittableTypeInfo(), new WorkerOperator(fileStoreTable)); if (parallelism != null) { - worker = worker.setParallelism(parallelism); + worker = worker.setParallelism(Math.min(parallelism, binaryPartitions.size())); } DataStream result = worker.getSideOutput(RESULT_SIDE_OUTPUT); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionITCase.java index ef9059008372..0b32177a50e8 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveUnexistingFilesActionITCase.java @@ -41,7 +41,7 @@ public void testAction(int bucket) throws Exception { int numFiles = 10; int[] numDeletes = new int[numPartitions]; ListUnexistingFilesTest.prepareRandomlyDeletedTable( - warehouse, bucket, numFiles, numDeletes); + warehouse, "mydb", "t", bucket, numFiles, numDeletes); RemoveUnexistingFilesAction action = createAction( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedureITCase.java index f983784146ed..b7f20d80243f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedureITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/RemoveUnexistingFilesProcedureITCase.java @@ -43,7 +43,7 @@ public void testProcedure(int bucket) throws Exception { int numFiles = 10; int[] numDeletes = new int[numPartitions]; ListUnexistingFilesTest.prepareRandomlyDeletedTable( - warehouse, bucket, numFiles, numDeletes); + warehouse, "mydb", "t", bucket, numFiles, numDeletes); TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); tEnv.executeSql( diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java index f5052ea25f95..06f747f606c3 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkProcedures.java @@ -38,6 +38,7 @@ import org.apache.paimon.spark.procedure.PurgeFilesProcedure; import org.apache.paimon.spark.procedure.RefreshObjectTableProcedure; import org.apache.paimon.spark.procedure.RemoveOrphanFilesProcedure; +import org.apache.paimon.spark.procedure.RemoveUnexistingFilesProcedure; import org.apache.paimon.spark.procedure.RenameTagProcedure; import org.apache.paimon.spark.procedure.RepairProcedure; import org.apache.paimon.spark.procedure.ReplaceTagProcedure; @@ -90,6 +91,7 @@ private static Map> initProcedureBuilders() { procedureBuilders.put("migrate_table", MigrateTableProcedure::builder); procedureBuilders.put("migrate_file", MigrateFileProcedure::builder); procedureBuilders.put("remove_orphan_files", RemoveOrphanFilesProcedure::builder); + procedureBuilders.put("remove_unexisting_files", RemoveUnexistingFilesProcedure::builder); procedureBuilders.put("expire_snapshots", ExpireSnapshotsProcedure::builder); procedureBuilders.put("expire_partitions", ExpirePartitionsProcedure::builder); procedureBuilders.put("repair", RepairProcedure::builder); diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java index dd5826420036..604b1d9b1957 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveOrphanFilesProcedure.java @@ -23,7 +23,6 @@ import org.apache.paimon.operation.LocalOrphanFilesClean; import org.apache.paimon.operation.OrphanFilesClean; import org.apache.paimon.spark.catalog.WithPaimonCatalog; -import org.apache.paimon.spark.orphan.SparkOrphanFilesClean; import org.apache.paimon.utils.Preconditions; import org.apache.spark.sql.catalyst.InternalRow; diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveUnexistingFilesProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveUnexistingFilesProcedure.java new file mode 100644 index 000000000000..555fcb145706 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/RemoveUnexistingFilesProcedure.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure; + +import org.apache.paimon.spark.catalog.WithPaimonCatalog; +import org.apache.paimon.utils.Preconditions; + +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.connector.catalog.TableCatalog; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.unsafe.types.UTF8String; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; + +/** + * Procedure to remove unexisting data files from manifest entries. See {@code + * RemoveUnexistingFilesAction} in {@code paimon-flink-common} module for detailed use cases. + * + *
    
    + *  -- remove unexisting data files in table `mydb.myt`
    + *  CALL sys.remove_unexisting_files(`table` => 'mydb.myt')
    + *
    + *  -- only check what files will be removed, but not really remove them (dry run)
    + *  CALL sys.remove_unexisting_files(`table` => 'mydb.myt', `dry_run` = true)
    + * 
    + * + *

    Note that user is on his own risk using this procedure, which may cause data loss when used + * outside from the use cases above. + */ +public class RemoveUnexistingFilesProcedure extends BaseProcedure { + + private static final Logger LOG = LoggerFactory.getLogger(RemoveUnexistingFilesProcedure.class); + + private static final ProcedureParameter[] PARAMETERS = + new ProcedureParameter[] { + ProcedureParameter.required("table", DataTypes.StringType), + ProcedureParameter.optional("dry_run", DataTypes.BooleanType), + ProcedureParameter.optional("parallelism", DataTypes.IntegerType) + }; + + private static final StructType OUTPUT_TYPE = + new StructType( + new StructField[] { + new StructField("fileName", DataTypes.StringType, false, Metadata.empty()) + }); + + private RemoveUnexistingFilesProcedure(TableCatalog tableCatalog) { + super(tableCatalog); + } + + @Override + public ProcedureParameter[] parameters() { + return PARAMETERS; + } + + @Override + public StructType outputType() { + return OUTPUT_TYPE; + } + + @Override + public InternalRow[] call(InternalRow args) { + String tableId = args.getString(0); + Preconditions.checkArgument( + tableId != null && !tableId.isEmpty(), + "Cannot handle an empty tableId for argument %s", + tableId); + org.apache.paimon.catalog.Identifier identifier = + org.apache.paimon.catalog.Identifier.fromString( + toIdentifier(args.getString(0), PARAMETERS[0].name()).toString()); + LOG.info("identifier is {}.", identifier); + + String[] result = + SparkRemoveUnexistingFiles.execute( + ((WithPaimonCatalog) tableCatalog()).paimonCatalog(), + identifier.getDatabaseName(), + identifier.getTableName(), + !args.isNullAt(1) && args.getBoolean(1), + args.isNullAt(2) ? null : args.getInt(2)); + return Arrays.stream(result) + .map(path -> newInternalRow(UTF8String.fromString(path))) + .toArray(InternalRow[]::new); + } + + public static ProcedureBuilder builder() { + return new BaseProcedure.Builder() { + @Override + public RemoveUnexistingFilesProcedure doBuild() { + return new RemoveUnexistingFilesProcedure(tableCatalog()); + } + }; + } + + @Override + public String description() { + return "RemoveUnexistingFilesProcedure"; + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala similarity index 99% rename from paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala rename to paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala index 16b896937961..328a11c01742 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/orphan/SparkOrphanFilesClean.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.paimon.spark.orphan +package org.apache.paimon.spark.procedure import org.apache.paimon.{utils, Snapshot} import org.apache.paimon.catalog.{Catalog, Identifier} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala new file mode 100644 index 000000000000..5792c740970c --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure + +import org.apache.paimon.catalog.{Catalog, Identifier} +import org.apache.paimon.data.BinaryRow +import org.apache.paimon.io.{CompactIncrement, DataFileMeta, DataIncrement} +import org.apache.paimon.memory.MemorySegment +import org.apache.paimon.operation.ListUnexistingFiles +import org.apache.paimon.table.FileStoreTable +import org.apache.paimon.table.sink.{CommitMessage, CommitMessageImpl, CommitMessageSerializer} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.SQLConfHelper + +import java.util +import java.util.{Collections, UUID} + +import scala.collection.JavaConverters._ + +case class SparkRemoveUnexistingFiles( + table: FileStoreTable, + dryRun: Boolean, + parallelism: Int, + @transient spark: SparkSession) + extends SQLConfHelper + with Logging { + + private def buildRDD() = { + val binaryPartitions = table.newScan().listPartitions() + val realParallelism = Math.min(binaryPartitions.size(), parallelism) + + val numPartitionFields = table.schema().partitionKeys().size() + val pathAndMessage = spark.sparkContext + .parallelize(binaryPartitions.asScala.map(partition => partition.toBytes), realParallelism) + .mapPartitions { + iter => + { + val reuse = new BinaryRow(numPartitionFields) + val operation = new ListUnexistingFiles(table) + val serializer = new CommitMessageSerializer() + iter.flatMap( + partitionBytes => { + reuse.pointTo(MemorySegment.wrap(partitionBytes), 0, partitionBytes.length) + operation.list(reuse).asScala.map { + case (bucket, metaMap) => + val message = new CommitMessageImpl( + reuse, + bucket, + new DataIncrement( + Collections.emptyList(), + new util.ArrayList[DataFileMeta](metaMap.values()), + Collections.emptyList()), + CompactIncrement.emptyIncrement()) + (metaMap.keySet().asScala.toSeq, serializer.serialize(message)) + } + }) + } + } + .repartition(1) + .cache() + + if (!dryRun) { + pathAndMessage.foreachPartition { + iter => + { + val serializer = new CommitMessageSerializer() + val messages = new util.ArrayList[CommitMessage]() + iter.foreach { + case (_, bytes) => messages.add(serializer.deserialize(serializer.getVersion, bytes)) + } + val commit = table.newCommit(UUID.randomUUID().toString) + commit.commit(Long.MaxValue, messages) + } + } + } + + pathAndMessage.mapPartitions( + iter => { + iter.flatMap { case (paths, _) => paths } + }) + } +} + +object SparkRemoveUnexistingFiles extends SQLConfHelper { + + def execute( + catalog: Catalog, + databaseName: String, + tableName: String, + dryRun: Boolean, + parallelismOpt: Integer): Array[String] = { + val spark = SparkSession.active + val parallelism = if (parallelismOpt == null) { + Math.max(spark.sparkContext.defaultParallelism, conf.numShufflePartitions) + } else { + parallelismOpt.intValue() + } + + val identifier = new Identifier(databaseName, tableName) + val table = catalog.getTable(identifier) + assert( + table.isInstanceOf[FileStoreTable], + s"Only FileStoreTable supports remove-unexsiting-files action. The table type is '${table.getClass.getName}'.") + val fileStoreTable = table.asInstanceOf[FileStoreTable] + SparkRemoveUnexistingFiles(fileStoreTable, dryRun, parallelism, spark).buildRDD().collect() + } +} diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveUnexistingFilesProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveUnexistingFilesProcedureTest.scala new file mode 100644 index 000000000000..4b327ad6c9bd --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RemoveUnexistingFilesProcedureTest.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.procedure + +import org.apache.paimon.operation.ListUnexistingFilesTest +import org.apache.paimon.spark.PaimonSparkTestBase + +import java.util.UUID + +class RemoveUnexistingFilesProcedureTest extends PaimonSparkTestBase { + + test("Paimon procedure: remove unexisting files, bucket = -1") { + testImpl(-1) + } + + test("Paimon procedure: remove unexisting files, bucket = 3") { + testImpl(3) + } + + private def testImpl(bucket: Int): Unit = { + val warehouse = tempDBDir.getCanonicalPath + + val numPartitions = 2 + val numFiles = 10 + val numDeletes = new Array[Int](numPartitions) + val tableName = "t_" + UUID.randomUUID().toString.replace("-", "_") + ListUnexistingFilesTest.prepareRandomlyDeletedTable( + warehouse, + "mydb", + tableName, + bucket, + numFiles, + numDeletes) + + val actual = new Array[Int](numPartitions) + val pattern = "pt=(\\d+?)/".r + spark.sql(s"USE mydb") + spark + .sql(s"CALL sys.remove_unexisting_files(table => '$tableName', dry_run => true)") + .collect() + .foreach( + r => { + pattern.findFirstMatchIn(r.getString(0)) match { + case Some(m) => actual(m.group(1).toInt) += 1 + } + }) + assert(actual.toSeq == numDeletes.toSeq) + + spark.sql(s"CALL sys.remove_unexisting_files(table => '$tableName')") + spark + .sql(s"SELECT pt, CAST(COUNT(*) AS INT) FROM $tableName GROUP BY pt") + .collect() + .foreach( + r => { + assert(r.getInt(1) == numFiles - numDeletes(r.getInt(0))) + }) + } +} diff --git a/paimon-spark/pom.xml b/paimon-spark/pom.xml index 64c966bd2e21..93c742b15af4 100644 --- a/paimon-spark/pom.xml +++ b/paimon-spark/pom.xml @@ -235,6 +235,14 @@ under the License. test + + org.apache.paimon + paimon-core + ${project.version} + test + test-jar + + org.apache.paimon paimon-hive-common From a76ab8a6dceff882eb68091e21c6a940660d5305 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Mon, 13 Jan 2025 14:30:57 +0800 Subject: [PATCH 6/8] [fix] Add docs for Spark procedure --- docs/content/spark/procedures.md | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/docs/content/spark/procedures.md b/docs/content/spark/procedures.md index bf7b8ae2d572..8e32c2201f7f 100644 --- a/docs/content/spark/procedures.md +++ b/docs/content/spark/procedures.md @@ -256,6 +256,24 @@ This section introduce all available spark procedures about paimon. CALL sys.remove_orphan_files(table => 'default.T', older_than => '2023-10-31 12:00:00', dry_run => true, parallelism => '5', mode => 'local') + + remove_unexisting_files + + Procedure to remove unexisting data files from manifest entries. See Java docs for detailed use cases. Arguments: +

  • identifier: the target table identifier. Cannot be empty, you can use database_name.* to clean whole database.
  • +
  • dryRun (optional): only check what files will be removed, but not really remove them. Default is false.
  • +
  • parallelism (optional): number of parallelisms to check files in the manifests.
  • +
    + Note that user is on his own risk using this procedure, which may cause data loss when used outside from the use cases listed in Java docs. + + + -- remove unexisting data files in the table `mydb.myt` + CALL sys.remove_unexisting_files(table => 'mydb.myt') +
    + -- only check what files will be removed, but not really remove them (dry run) + CALL sys.remove_unexisting_files(table => 'mydb.myt', dry_run = true) + + repair From 303abc92172a79702493408f219692c99d2a7fd2 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Tue, 14 Jan 2025 10:16:34 +0800 Subject: [PATCH 7/8] [fix] Fix compile error --- .../paimon/spark/procedure/SparkRemoveUnexistingFiles.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala index 5792c740970c..2498dd441859 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala @@ -49,7 +49,7 @@ case class SparkRemoveUnexistingFiles( val numPartitionFields = table.schema().partitionKeys().size() val pathAndMessage = spark.sparkContext - .parallelize(binaryPartitions.asScala.map(partition => partition.toBytes), realParallelism) + .parallelize(binaryPartitions.asScala.map(partition => partition.toBytes).toSeq, realParallelism) .mapPartitions { iter => { From 54ea6607abb3e02fae17f8227e73fd366c4b7fc6 Mon Sep 17 00:00:00 2001 From: tsreaper Date: Tue, 14 Jan 2025 10:21:59 +0800 Subject: [PATCH 8/8] [fix] Fix format --- .../paimon/spark/procedure/SparkRemoveUnexistingFiles.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala index 2498dd441859..d37099388238 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkRemoveUnexistingFiles.scala @@ -49,7 +49,9 @@ case class SparkRemoveUnexistingFiles( val numPartitionFields = table.schema().partitionKeys().size() val pathAndMessage = spark.sparkContext - .parallelize(binaryPartitions.asScala.map(partition => partition.toBytes).toSeq, realParallelism) + .parallelize( + binaryPartitions.asScala.map(partition => partition.toBytes).toSeq, + realParallelism) .mapPartitions { iter => {