From 51db5be5fb63b8cc938f7a626d0a72bb9e1c307d Mon Sep 17 00:00:00 2001 From: yuzelin Date: Mon, 6 Jan 2025 14:38:23 +0800 Subject: [PATCH] [hotfix] Refactor RemoveOrphanFilesActionITCase in flink-1.18 to avoid flaky test --- .../flink/RemoveOrphanFilesActionITCase.java | 289 +----------------- .../RemoveOrphanFilesActionITCaseBase.java | 13 + 2 files changed, 17 insertions(+), 285 deletions(-) diff --git a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java index 5a9cd0fe2d9a..a3bcbda12f8d 100644 --- a/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java +++ b/paimon-flink/paimon-flink-1.18/src/test/java/org/apache/paimon/flink/RemoveOrphanFilesActionITCase.java @@ -18,294 +18,13 @@ package org.apache.paimon.flink; -import org.apache.paimon.CoreOptions; -import org.apache.paimon.data.BinaryString; -import org.apache.paimon.data.GenericRow; -import org.apache.paimon.flink.action.ActionITCaseBase; import org.apache.paimon.flink.action.RemoveOrphanFilesAction; -import org.apache.paimon.fs.FileIO; -import org.apache.paimon.fs.Path; -import org.apache.paimon.options.Options; -import org.apache.paimon.schema.SchemaChange; -import org.apache.paimon.schema.SchemaManager; -import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.FileStoreTableFactory; -import org.apache.paimon.table.sink.StreamTableCommit; -import org.apache.paimon.table.sink.StreamTableWrite; -import org.apache.paimon.table.sink.StreamWriteBuilder; -import org.apache.paimon.types.DataType; -import org.apache.paimon.types.DataTypes; -import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.DateTimeUtils; - -import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; - -import org.apache.flink.types.Row; -import org.apache.flink.util.CloseableIterator; -import org.junit.jupiter.api.Test; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.ThreadLocalRandom; - -import static org.apache.paimon.CoreOptions.SCAN_FALLBACK_BRANCH; -import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.assertThatCode; +import org.apache.paimon.flink.action.RemoveOrphanFilesActionITCaseBase; /** IT cases base for {@link RemoveOrphanFilesAction} in Flink 1.18. */ -public class RemoveOrphanFilesActionITCase extends ActionITCaseBase { - private static final String ORPHAN_FILE_1 = "bucket-0/orphan_file1"; - private static final String ORPHAN_FILE_2 = "bucket-0/orphan_file2"; - - private FileStoreTable createTableAndWriteData(String tableName) throws Exception { - RowType rowType = - RowType.of( - new DataType[] {DataTypes.BIGINT(), DataTypes.STRING()}, - new String[] {"k", "v"}); - - FileStoreTable table = - createFileStoreTable( - tableName, - rowType, - Collections.emptyList(), - Collections.singletonList("k"), - Collections.emptyList(), - Collections.emptyMap()); - - StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder().withCommitUser(commitUser); - write = writeBuilder.newWrite(); - commit = writeBuilder.newCommit(); - - writeData(rowData(1L, BinaryString.fromString("Hi"))); - - Path orphanFile1 = getOrphanFilePath(table, ORPHAN_FILE_1); - Path orphanFile2 = getOrphanFilePath(table, ORPHAN_FILE_2); - - FileIO fileIO = table.fileIO(); - fileIO.writeFile(orphanFile1, "a", true); - Thread.sleep(2000); - fileIO.writeFile(orphanFile2, "b", true); - - return table; - } - - private Path getOrphanFilePath(FileStoreTable table, String orphanFile) { - return new Path(table.location(), orphanFile); - } - - @Test - public void testRunWithoutException() throws Exception { - createTableAndWriteData(tableName); - - List args = - new ArrayList<>( - Arrays.asList( - "remove_orphan_files", - "--warehouse", - warehouse, - "--database", - database, - "--table", - tableName)); - RemoveOrphanFilesAction action1 = createAction(RemoveOrphanFilesAction.class, args); - assertThatCode(action1::run).doesNotThrowAnyException(); - - args.add("--older_than"); - args.add("2023-12-31 23:59:59"); - RemoveOrphanFilesAction action2 = createAction(RemoveOrphanFilesAction.class, args); - assertThatCode(action2::run).doesNotThrowAnyException(); - - String withoutOlderThan = - String.format("CALL sys.remove_orphan_files('%s.%s')", database, tableName); - - CloseableIterator withoutOlderThanCollect = executeSQL(withoutOlderThan); - assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0")); - - String olderThan = - DateTimeUtils.formatLocalDateTime( - DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3); - String withDryRun = - String.format( - "CALL sys.remove_orphan_files('%s.%s', '%s', true)", - database, tableName, olderThan); - ImmutableList actualDryRunDeleteFile = ImmutableList.copyOf(executeSQL(withDryRun)); - assertThat(actualDryRunDeleteFile).containsOnly(Row.of("2")); - - String withOlderThan = - String.format( - "CALL sys.remove_orphan_files('%s.%s', '%s')", - database, tableName, olderThan); - ImmutableList actualDeleteFile = ImmutableList.copyOf(executeSQL(withOlderThan)); - - assertThat(actualDeleteFile).containsExactlyInAnyOrder(Row.of("2"), Row.of("2")); - } - - @Test - public void testRemoveDatabaseOrphanFilesITCase() throws Exception { - createTableAndWriteData("tableName1"); - createTableAndWriteData("tableName2"); - - List args = - new ArrayList<>( - Arrays.asList( - "remove_orphan_files", - "--warehouse", - warehouse, - "--database", - database, - "--table", - "*")); - RemoveOrphanFilesAction action1 = createAction(RemoveOrphanFilesAction.class, args); - assertThatCode(action1::run).doesNotThrowAnyException(); - - args.add("--older_than"); - args.add("2023-12-31 23:59:59"); - RemoveOrphanFilesAction action2 = createAction(RemoveOrphanFilesAction.class, args); - assertThatCode(action2::run).doesNotThrowAnyException(); - - args.add("--parallelism"); - args.add("5"); - RemoveOrphanFilesAction action3 = createAction(RemoveOrphanFilesAction.class, args); - assertThatCode(action3::run).doesNotThrowAnyException(); - - String withoutOlderThan = - String.format("CALL sys.remove_orphan_files('%s.%s')", database, "*"); - CloseableIterator withoutOlderThanCollect = executeSQL(withoutOlderThan); - assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0")); - - String withParallelism = - String.format("CALL sys.remove_orphan_files('%s.%s','',true,5)", database, "*"); - CloseableIterator withParallelismCollect = executeSQL(withParallelism); - assertThat(ImmutableList.copyOf(withParallelismCollect)).containsOnly(Row.of("0")); - - String olderThan = - DateTimeUtils.formatLocalDateTime( - DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3); - String withDryRun = - String.format( - "CALL sys.remove_orphan_files('%s.%s', '%s', true)", - database, "*", olderThan); - ImmutableList actualDryRunDeleteFile = ImmutableList.copyOf(executeSQL(withDryRun)); - assertThat(actualDryRunDeleteFile).containsOnly(Row.of("4")); - - String withOlderThan = - String.format( - "CALL sys.remove_orphan_files('%s.%s', '%s')", database, "*", olderThan); - ImmutableList actualDeleteFile = ImmutableList.copyOf(executeSQL(withOlderThan)); - - assertThat(actualDeleteFile).containsOnly(Row.of("4")); - } - - @Test - public void testCleanWithBranch() throws Exception { - // create main branch - FileStoreTable table = createTableAndWriteData(tableName); - - // create first branch and write some data - table.createBranch("br"); - SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location(), "br"); - TableSchema branchSchema = - schemaManager.commitChanges(SchemaChange.addColumn("v2", DataTypes.INT())); - Options branchOptions = new Options(branchSchema.options()); - branchOptions.set(CoreOptions.BRANCH, "br"); - branchSchema = branchSchema.copy(branchOptions.toMap()); - FileStoreTable branchTable = - FileStoreTableFactory.create(table.fileIO(), table.location(), branchSchema); - - String commitUser = UUID.randomUUID().toString(); - StreamTableWrite write = branchTable.newWrite(commitUser); - StreamTableCommit commit = branchTable.newCommit(commitUser); - write.write(GenericRow.of(2L, BinaryString.fromString("Hello"), 20)); - commit.commit(1, write.prepareCommit(false, 1)); - write.close(); - commit.close(); - - // create orphan file in snapshot directory of first branch - Path orphanFile3 = new Path(table.location(), "branch/branch-br/snapshot/orphan_file3"); - branchTable.fileIO().writeFile(orphanFile3, "x", true); - - // create second branch, which is empty - table.createBranch("br2"); - - // create orphan file in snapshot directory of second branch - Path orphanFile4 = new Path(table.location(), "branch/branch-br2/snapshot/orphan_file4"); - branchTable.fileIO().writeFile(orphanFile4, "y", true); - - if (ThreadLocalRandom.current().nextBoolean()) { - executeSQL( - String.format( - "ALTER TABLE `%s`.`%s` SET ('%s' = 'br')", - database, tableName, SCAN_FALLBACK_BRANCH.key()), - false, - true); - } - - String olderThan = - DateTimeUtils.formatLocalDateTime( - DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3); - String procedure = - String.format( - "CALL sys.remove_orphan_files('%s.%s', '%s')", database, "*", olderThan); - ImmutableList actualDeleteFile = ImmutableList.copyOf(executeSQL(procedure)); - assertThat(actualDeleteFile).containsOnly(Row.of("4")); - } - - @Test - public void testRunWithMode() throws Exception { - createTableAndWriteData(tableName); - - List args = - new ArrayList<>( - Arrays.asList( - "remove_orphan_files", - "--warehouse", - warehouse, - "--database", - database, - "--table", - tableName)); - RemoveOrphanFilesAction action1 = createAction(RemoveOrphanFilesAction.class, args); - assertThatCode(action1::run).doesNotThrowAnyException(); - - args.add("--older_than"); - args.add("2023-12-31 23:59:59"); - RemoveOrphanFilesAction action2 = createAction(RemoveOrphanFilesAction.class, args); - assertThatCode(action2::run).doesNotThrowAnyException(); - - String withoutOlderThan = - String.format("CALL sys.remove_orphan_files('%s.%s')", database, tableName); - CloseableIterator withoutOlderThanCollect = executeSQL(withoutOlderThan); - assertThat(ImmutableList.copyOf(withoutOlderThanCollect)).containsOnly(Row.of("0")); - - String olderThan = - DateTimeUtils.formatLocalDateTime( - DateTimeUtils.toLocalDateTime(System.currentTimeMillis()), 3); - String withLocalMode = - String.format( - "CALL sys.remove_orphan_files('%s.%s', '%s', true, 5, 'local')", - database, tableName, olderThan); - ImmutableList actualLocalRunDeleteFile = - ImmutableList.copyOf(executeSQL(withLocalMode)); - assertThat(actualLocalRunDeleteFile).containsOnly(Row.of("2")); - - String withDistributedMode = - String.format( - "CALL sys.remove_orphan_files('%s.%s', '%s', true, 5, 'distributed')", - database, tableName, olderThan); - ImmutableList actualDistributedRunDeleteFile = - ImmutableList.copyOf(executeSQL(withDistributedMode)); - assertThat(actualDistributedRunDeleteFile).containsOnly(Row.of("2")); +public class RemoveOrphanFilesActionITCase extends RemoveOrphanFilesActionITCaseBase { - String withInvalidMode = - String.format( - "CALL sys.remove_orphan_files('%s.%s', '%s', true, 5, 'unknown')", - database, tableName, olderThan); - assertThatCode(() -> executeSQL(withInvalidMode)) - .isInstanceOf(RuntimeException.class) - .hasMessageContaining("Unknown mode"); + protected boolean supportNamedArgument() { + return false; } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java index fcc3f8893f65..8bdb8979a280 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RemoveOrphanFilesActionITCaseBase.java @@ -54,6 +54,7 @@ import static org.apache.paimon.CoreOptions.SCAN_FALLBACK_BRANCH; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatCode; +import static org.junit.jupiter.api.Assumptions.assumeTrue; /** IT cases base for {@link RemoveOrphanFilesAction}. */ public abstract class RemoveOrphanFilesActionITCaseBase extends ActionITCaseBase { @@ -100,6 +101,8 @@ private Path getOrphanFilePath(FileStoreTable table, String orphanFile) { @ParameterizedTest @ValueSource(booleans = {true, false}) public void testRunWithoutException(boolean isNamedArgument) throws Exception { + assumeTrue(!isNamedArgument || supportNamedArgument()); + createTableAndWriteData(tableName); List args = @@ -160,6 +163,8 @@ public void testRunWithoutException(boolean isNamedArgument) throws Exception { @ParameterizedTest @ValueSource(booleans = {true, false}) public void testRemoveDatabaseOrphanFilesITCase(boolean isNamedArgument) throws Exception { + assumeTrue(!isNamedArgument || supportNamedArgument()); + createTableAndWriteData("tableName1"); createTableAndWriteData("tableName2"); @@ -231,6 +236,8 @@ public void testRemoveDatabaseOrphanFilesITCase(boolean isNamedArgument) throws @ParameterizedTest @ValueSource(booleans = {true, false}) public void testCleanWithBranch(boolean isNamedArgument) throws Exception { + assumeTrue(!isNamedArgument || supportNamedArgument()); + // create main branch FileStoreTable table = createTableAndWriteData(tableName); @@ -292,6 +299,8 @@ public void testCleanWithBranch(boolean isNamedArgument) throws Exception { @ParameterizedTest @ValueSource(booleans = {true, false}) public void testRunWithMode(boolean isNamedArgument) throws Exception { + assumeTrue(!isNamedArgument || supportNamedArgument()); + createTableAndWriteData(tableName); List args = @@ -361,4 +370,8 @@ public void testRunWithMode(boolean isNamedArgument) throws Exception { .isInstanceOf(RuntimeException.class) .hasMessageContaining("Unknown mode"); } + + protected boolean supportNamedArgument() { + return true; + } }