From 88d210ef9708f975a1ca0ec57c392e1d86411c91 Mon Sep 17 00:00:00 2001 From: Bijan Houle Date: Tue, 5 Apr 2022 10:51:32 -0600 Subject: [PATCH 1/2] Spark 3.2: Add `actualFilesTable` param to DeleteOrphanFiles action/procedure This allows the user to specify an existing table of actual files for the DeleteOrphanFiles action, skipping the directory listing. --- .../iceberg/actions/DeleteOrphanFiles.java | 13 +++++++ .../TestRemoveOrphanFilesProcedure.java | 36 +++++++++++++++++++ .../BaseDeleteOrphanFilesSparkAction.java | 35 ++++++++++++++++-- .../RemoveOrphanFilesProcedure.java | 14 +++++++- .../actions/TestRemoveOrphanFilesAction.java | 12 ++++++- 5 files changed, 106 insertions(+), 4 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java index 4ee75a6e7990..fc41ab3f1296 100644 --- a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java @@ -80,6 +80,19 @@ public interface DeleteOrphanFiles extends Action sql( "CALL %s.system.remove_orphan_files(table => '%s', max_concurrent_deletes => %s)", catalogName, tableIdent, -1)); + + AssertHelpers.assertThrows("Should reject calls with both actual_files_table and location args", + IllegalArgumentException.class, "actual_files_table cannot be used with", + () -> sql( + "CALL %s.system.remove_orphan_files(table => '%s', actual_files_table => '', location => '')", + catalogName, tableIdent)); + + AssertHelpers.assertThrows("Should reject calls with both actual_files_table and older_than args", + IllegalArgumentException.class, "actual_files_table cannot be used with", + () -> sql( + "CALL %s.system.remove_orphan_files(table => '%s', actual_files_table => '', older_than => TIMESTAMP '%s')", + catalogName, tableIdent, "1000-01-01 00:00:00")); + + AssertHelpers.assertThrows("Should throw an error if actual_files_table does not exist", + ValidationException.class, "does not exist", + () -> sql( + "CALL %s.system.remove_orphan_files(table => '%s', actual_files_table => 'missing')", + catalogName, tableIdent)); + + String tempViewName = "actual_files_test"; + spark.emptyDataFrame().createOrReplaceTempView(tempViewName); + + AssertHelpers.assertThrows("Should throw an error if actual_files_table is missing required column", + ValidationException.class, "is missing required `file_path` column", + () -> sql( + "CALL %s.system.remove_orphan_files(table => '%s', actual_files_table => '%s')", + catalogName, tableIdent, tempViewName)); + + spark.emptyDataFrame().createOrReplaceTempView(tempViewName); + spark.createDataset(Lists.newArrayList(), Encoders.INT()).toDF("file_path").createOrReplaceTempView(tempViewName); + + AssertHelpers.assertThrows("Should throw an error if actual_files_table has wrong schema", + ValidationException.class, "Invalid schema", + () -> sql( + "CALL %s.system.remove_orphan_files(table => '%s', actual_files_table => '%s')", + catalogName, tableIdent, tempViewName)); } @Test diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java index e9d1ee8d605d..5ef291ec5022 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java @@ -53,6 +53,8 @@ import org.apache.spark.sql.expressions.UserDefinedFunction; import org.apache.spark.sql.functions; import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; import org.apache.spark.util.SerializableConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +71,14 @@ * removes unreachable files that are older than 3 days using {@link Table#io()}. The behavior can be modified * by passing a custom location to {@link #location} and a custom timestamp to {@link #olderThan(long)}. * For example, someone might point this action to the data folder to clean up only orphan data files. - * In addition, there is a way to configure an alternative delete method via {@link #deleteWith(Consumer)}. + *

+ * Configure an alternative delete method using {@link #deleteWith(Consumer)}. + *

+ * For full control of the set of files being evaluated, use the {@link #actualFilesTable(String)} argument. This + * skips the directory listing - any files in the actualFilesTable provided which are not found in table metadata will + * be deleted. + * Not compatible with `location` or `older_than` arguments - this assumes that the provided table of actual files + * has been filtered down to the table’s location and only includes files older than a reasonable retention interval. *

* Note: It is dangerous to call this action with a short retention interval as it might corrupt * the state of the table if another operation is writing at the same time. @@ -99,6 +108,7 @@ public void accept(String file) { private String location = null; private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3); + private Dataset providedActualFilesDF; private Consumer deleteFunc = defaultDelete; private ExecutorService deleteExecutorService = null; @@ -144,6 +154,27 @@ public BaseDeleteOrphanFilesSparkAction deleteWith(Consumer newDeleteFun return this; } + @Override + public BaseDeleteOrphanFilesSparkAction actualFilesTable(String tableName) { + ValidationException.check( + spark().catalog().tableExists(tableName), + "actualFilesTable `" + tableName + "` does not exist"); + + try { + StructType schema = spark().table(tableName).schema(); + StructField filePathField = schema.apply("file_path"); + ValidationException.check( + filePathField.dataType() == DataTypes.StringType, + "Invalid schema for actual files table - 'file_path' column is not a string type"); + } catch (IllegalArgumentException e) { + throw new ValidationException( + "actualFilesTable `" + tableName + "` is missing required `file_path` column"); + } + + this.providedActualFilesDF = spark().table(tableName); + return this; + } + @Override public DeleteOrphanFiles.Result execute() { JobGroupInfo info = newJobGroupInfo("DELETE-ORPHAN-FILES", jobDesc()); @@ -163,7 +194,7 @@ private DeleteOrphanFiles.Result doExecute() { Dataset validContentFileDF = buildValidContentFileDF(table); Dataset validMetadataFileDF = buildValidMetadataFileDF(table); Dataset validFileDF = validContentFileDF.union(validMetadataFileDF); - Dataset actualFileDF = buildActualFileDF(); + Dataset actualFileDF = this.providedActualFilesDF == null ? buildActualFileDF() : providedActualFilesDF; Column actualFileName = filenameUDF.apply(actualFileDF.col(FILE_PATH)); Column validFileName = filenameUDF.apply(validFileDF.col(FILE_PATH)); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java index 7dbdbc4a44ab..9b5037a46de9 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java @@ -49,7 +49,8 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure { ProcedureParameter.optional("older_than", DataTypes.TimestampType), ProcedureParameter.optional("location", DataTypes.StringType), ProcedureParameter.optional("dry_run", DataTypes.BooleanType), - ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType) + ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType), + ProcedureParameter.optional("actual_files_table", DataTypes.StringType) }; private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{ @@ -80,16 +81,23 @@ public StructType outputType() { } @Override + @SuppressWarnings("checkstyle:CyclomaticComplexity") public InternalRow[] call(InternalRow args) { Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name()); Long olderThanMillis = args.isNullAt(1) ? null : DateTimeUtil.microsToMillis(args.getLong(1)); String location = args.isNullAt(2) ? null : args.getString(2); boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3); Integer maxConcurrentDeletes = args.isNullAt(4) ? null : args.getInt(4); + String tableWithActualFilePaths = args.isNullAt(5) ? null : args.getString(5); Preconditions.checkArgument(maxConcurrentDeletes == null || maxConcurrentDeletes > 0, "max_concurrent_deletes should have value > 0, value: " + maxConcurrentDeletes); + Preconditions.checkArgument( + tableWithActualFilePaths == null || (location == null && olderThanMillis == null), + "actual_files_table cannot be used with `location` or `older_than`" + ); + return withIcebergTable(tableIdent, table -> { DeleteOrphanFiles action = actions().deleteOrphanFiles(table); @@ -113,6 +121,10 @@ public InternalRow[] call(InternalRow args) { action.executeDeleteWith(executorService(maxConcurrentDeletes, "remove-orphans")); } + if (tableWithActualFilePaths != null) { + action.actualFilesTable(tableWithActualFilePaths); + } + DeleteOrphanFiles.Result result = action.execute(); return toOutputRows(result); diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java index a62c7ab53136..7405136c380f 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java @@ -150,10 +150,20 @@ public void testDryRun() throws IOException, InterruptedException { Assert.assertEquals("Action should find 1 file", invalidFiles, result2.orphanFileLocations()); Assert.assertTrue("Invalid file should be present", fs.exists(new Path(invalidFiles.get(0)))); + String actualFilesTableName = "actualFilesTable"; + spark.createDataset(allFiles, Encoders.STRING()).toDF("file_path").createOrReplaceTempView(actualFilesTableName); DeleteOrphanFiles.Result result3 = actions.deleteOrphanFiles(table) + .deleteWith(s -> { }) + .actualFilesTable(actualFilesTableName) + .execute(); + + Assert.assertEquals("Action should find 1 file", invalidFiles, result3.orphanFileLocations()); + Assert.assertTrue("Invalid file should be present", fs.exists(new Path(invalidFiles.get(0)))); + + DeleteOrphanFiles.Result result4 = actions.deleteOrphanFiles(table) .olderThan(System.currentTimeMillis()) .execute(); - Assert.assertEquals("Action should delete 1 file", invalidFiles, result3.orphanFileLocations()); + Assert.assertEquals("Action should delete 1 file", invalidFiles, result4.orphanFileLocations()); Assert.assertFalse("Invalid file should not be present", fs.exists(new Path(invalidFiles.get(0)))); List expectedRecords = Lists.newArrayList(); From 2cbe301c7b12ac5b05b90c5b2b9e67c53b6898ca Mon Sep 17 00:00:00 2001 From: Bijan Houle Date: Wed, 20 Apr 2022 12:54:54 -0600 Subject: [PATCH 2/2] Review feedback - action: rename actualFilesTable -> compareWithFileList - procedure: rename actual_files_table -> file_list_view - interface change: remove from DeleteOrphanFiles, add to BaseDeleteOrphanFilesSparkAction - apply location and last_modified filtering to compareWithFileList - refactor test cases --- .../iceberg/actions/DeleteOrphanFiles.java | 13 -- .../TestRemoveOrphanFilesProcedure.java | 70 +++++----- .../BaseDeleteOrphanFilesSparkAction.java | 56 ++++---- .../spark/actions/BaseSparkAction.java | 1 + .../RemoveOrphanFilesProcedure.java | 14 +- .../actions/TestRemoveOrphanFilesAction.java | 130 ++++++++++++++++-- .../source/FilePathLastModifiedRecord.java | 78 +++++++++++ 7 files changed, 273 insertions(+), 89 deletions(-) create mode 100644 spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/FilePathLastModifiedRecord.java diff --git a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java index fc41ab3f1296..4ee75a6e7990 100644 --- a/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/DeleteOrphanFiles.java @@ -80,19 +80,6 @@ public interface DeleteOrphanFiles extends Action '%s', max_concurrent_deletes => %s)", catalogName, tableIdent, -1)); - AssertHelpers.assertThrows("Should reject calls with both actual_files_table and location args", - IllegalArgumentException.class, "actual_files_table cannot be used with", - () -> sql( - "CALL %s.system.remove_orphan_files(table => '%s', actual_files_table => '', location => '')", - catalogName, tableIdent)); - - AssertHelpers.assertThrows("Should reject calls with both actual_files_table and older_than args", - IllegalArgumentException.class, "actual_files_table cannot be used with", - () -> sql( - "CALL %s.system.remove_orphan_files(table => '%s', actual_files_table => '', older_than => TIMESTAMP '%s')", - catalogName, tableIdent, "1000-01-01 00:00:00")); - - AssertHelpers.assertThrows("Should throw an error if actual_files_table does not exist", - ValidationException.class, "does not exist", - () -> sql( - "CALL %s.system.remove_orphan_files(table => '%s', actual_files_table => 'missing')", - catalogName, tableIdent)); - - String tempViewName = "actual_files_test"; + String tempViewName = "file_list_test"; spark.emptyDataFrame().createOrReplaceTempView(tempViewName); - AssertHelpers.assertThrows("Should throw an error if actual_files_table is missing required column", - ValidationException.class, "is missing required `file_path` column", - () -> sql( - "CALL %s.system.remove_orphan_files(table => '%s', actual_files_table => '%s')", - catalogName, tableIdent, tempViewName)); - - spark.emptyDataFrame().createOrReplaceTempView(tempViewName); - spark.createDataset(Lists.newArrayList(), Encoders.INT()).toDF("file_path").createOrReplaceTempView(tempViewName); - - AssertHelpers.assertThrows("Should throw an error if actual_files_table has wrong schema", - ValidationException.class, "Invalid schema", - () -> sql( - "CALL %s.system.remove_orphan_files(table => '%s', actual_files_table => '%s')", - catalogName, tableIdent, tempViewName)); + AssertHelpers.assertThrows( + "Should throw an error if file_list_view is missing required columns", + IllegalArgumentException.class, + "does not exist. Available:", + () -> + sql( + "CALL %s.system.remove_orphan_files(table => '%s', file_list_view => '%s')", + catalogName, tableIdent, tempViewName)); + + spark + .createDataset(Lists.newArrayList(), Encoders.tuple(Encoders.INT(), Encoders.TIMESTAMP())) + .toDF("file_path", "last_modified") + .createOrReplaceTempView(tempViewName); + + AssertHelpers.assertThrows( + "Should throw an error if file_path has wrong type", + IllegalArgumentException.class, + "Invalid file_path column", + () -> + sql( + "CALL %s.system.remove_orphan_files(table => '%s', file_list_view => '%s')", + catalogName, tableIdent, tempViewName)); + + spark + .createDataset(Lists.newArrayList(), Encoders.tuple(Encoders.STRING(), Encoders.STRING())) + .toDF("file_path", "last_modified") + .createOrReplaceTempView(tempViewName); + + AssertHelpers.assertThrows( + "Should throw an error if last_modified has wrong type", + IllegalArgumentException.class, + "Invalid last_modified column", + () -> + sql( + "CALL %s.system.remove_orphan_files(table => '%s', file_list_view => '%s')", + catalogName, tableIdent, tempViewName)); } @Test diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java index 5ef291ec5022..9b8011856839 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseDeleteOrphanFilesSparkAction.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; +import java.sql.Timestamp; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; @@ -38,6 +39,7 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.hadoop.HiddenPathFilter; import org.apache.iceberg.relocated.com.google.common.base.Joiner; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.JobGroupInfo; import org.apache.iceberg.util.PropertyUtil; @@ -74,11 +76,9 @@ *

* Configure an alternative delete method using {@link #deleteWith(Consumer)}. *

- * For full control of the set of files being evaluated, use the {@link #actualFilesTable(String)} argument. This - * skips the directory listing - any files in the actualFilesTable provided which are not found in table metadata will - * be deleted. - * Not compatible with `location` or `older_than` arguments - this assumes that the provided table of actual files - * has been filtered down to the table’s location and only includes files older than a reasonable retention interval. + * For full control of the set of files being evaluated, use the {@link #compareToFileList(Dataset)} argument. This + * skips the directory listing - any files in the dataset provided which are not found in table metadata will + * be deleted, using the same {@link Table#location()} and {@link #olderThan(long)} filtering as above. *

* Note: It is dangerous to call this action with a short retention interval as it might corrupt * the state of the table if another operation is writing at the same time. @@ -108,7 +108,7 @@ public void accept(String file) { private String location = null; private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3); - private Dataset providedActualFilesDF; + private Dataset compareToFileList; private Consumer deleteFunc = defaultDelete; private ExecutorService deleteExecutorService = null; @@ -154,27 +154,37 @@ public BaseDeleteOrphanFilesSparkAction deleteWith(Consumer newDeleteFun return this; } - @Override - public BaseDeleteOrphanFilesSparkAction actualFilesTable(String tableName) { - ValidationException.check( - spark().catalog().tableExists(tableName), - "actualFilesTable `" + tableName + "` does not exist"); + public BaseDeleteOrphanFilesSparkAction compareToFileList(Dataset files) { + StructType schema = files.schema(); - try { - StructType schema = spark().table(tableName).schema(); - StructField filePathField = schema.apply("file_path"); - ValidationException.check( - filePathField.dataType() == DataTypes.StringType, - "Invalid schema for actual files table - 'file_path' column is not a string type"); - } catch (IllegalArgumentException e) { - throw new ValidationException( - "actualFilesTable `" + tableName + "` is missing required `file_path` column"); - } + StructField filePathField = schema.apply(FILE_PATH); + Preconditions.checkArgument( + filePathField.dataType() == DataTypes.StringType, + "Invalid %s column: %s is not a string", + FILE_PATH, + filePathField.dataType()); + + StructField lastModifiedField = schema.apply(LAST_MODIFIED); + Preconditions.checkArgument( + lastModifiedField.dataType() == DataTypes.TimestampType, + "Invalid %s column: %s is not a timestamp", + LAST_MODIFIED, + lastModifiedField.dataType()); - this.providedActualFilesDF = spark().table(tableName); + this.compareToFileList = files; return this; } + private Dataset filteredCompareToFileList() { + Dataset files = compareToFileList; + if (location != null) { + files = files.filter(files.col(FILE_PATH).startsWith(location)); + } + return files + .filter(files.col(LAST_MODIFIED).lt(new Timestamp(olderThanTimestamp))) + .select(files.col(FILE_PATH)); + } + @Override public DeleteOrphanFiles.Result execute() { JobGroupInfo info = newJobGroupInfo("DELETE-ORPHAN-FILES", jobDesc()); @@ -194,7 +204,7 @@ private DeleteOrphanFiles.Result doExecute() { Dataset validContentFileDF = buildValidContentFileDF(table); Dataset validMetadataFileDF = buildValidMetadataFileDF(table); Dataset validFileDF = validContentFileDF.union(validMetadataFileDF); - Dataset actualFileDF = this.providedActualFilesDF == null ? buildActualFileDF() : providedActualFilesDF; + Dataset actualFileDF = compareToFileList == null ? buildActualFileDF() : filteredCompareToFileList(); Column actualFileName = filenameUDF.apply(actualFileDF.col(FILE_PATH)); Column validFileName = filenameUDF.apply(validFileDF.col(FILE_PATH)); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java index 4e53733ae469..f8c5e454b0ca 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/actions/BaseSparkAction.java @@ -62,6 +62,7 @@ abstract class BaseSparkAction implements Action { protected static final String FILE_PATH = "file_path"; protected static final String FILE_TYPE = "file_type"; + protected static final String LAST_MODIFIED = "last_modified"; private static final AtomicInteger JOB_COUNTER = new AtomicInteger(); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java index 9b5037a46de9..23d64719081c 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/RemoveOrphanFilesProcedure.java @@ -24,6 +24,7 @@ import org.apache.iceberg.actions.DeleteOrphanFiles; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.spark.actions.BaseDeleteOrphanFilesSparkAction; import org.apache.iceberg.spark.actions.SparkActions; import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder; import org.apache.iceberg.util.DateTimeUtil; @@ -50,7 +51,7 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure { ProcedureParameter.optional("location", DataTypes.StringType), ProcedureParameter.optional("dry_run", DataTypes.BooleanType), ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType), - ProcedureParameter.optional("actual_files_table", DataTypes.StringType) + ProcedureParameter.optional("file_list_view", DataTypes.StringType) }; private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{ @@ -88,16 +89,11 @@ public InternalRow[] call(InternalRow args) { String location = args.isNullAt(2) ? null : args.getString(2); boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3); Integer maxConcurrentDeletes = args.isNullAt(4) ? null : args.getInt(4); - String tableWithActualFilePaths = args.isNullAt(5) ? null : args.getString(5); + String fileListView = args.isNullAt(5) ? null : args.getString(5); Preconditions.checkArgument(maxConcurrentDeletes == null || maxConcurrentDeletes > 0, "max_concurrent_deletes should have value > 0, value: " + maxConcurrentDeletes); - Preconditions.checkArgument( - tableWithActualFilePaths == null || (location == null && olderThanMillis == null), - "actual_files_table cannot be used with `location` or `older_than`" - ); - return withIcebergTable(tableIdent, table -> { DeleteOrphanFiles action = actions().deleteOrphanFiles(table); @@ -121,8 +117,8 @@ public InternalRow[] call(InternalRow args) { action.executeDeleteWith(executorService(maxConcurrentDeletes, "remove-orphans")); } - if (tableWithActualFilePaths != null) { - action.actualFilesTable(tableWithActualFilePaths); + if (fileListView != null) { + ((BaseDeleteOrphanFilesSparkAction) action).compareToFileList(spark().table(fileListView)); } DeleteOrphanFiles.Result result = action.execute(); diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java index 7405136c380f..39427e66b4dd 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRemoveOrphanFilesAction.java @@ -21,6 +21,7 @@ import java.io.File; import java.io.IOException; +import java.sql.Timestamp; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -53,6 +54,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkTestBase; +import org.apache.iceberg.spark.source.FilePathLastModifiedRecord; import org.apache.iceberg.spark.source.ThreeColumnRecord; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; @@ -150,20 +152,10 @@ public void testDryRun() throws IOException, InterruptedException { Assert.assertEquals("Action should find 1 file", invalidFiles, result2.orphanFileLocations()); Assert.assertTrue("Invalid file should be present", fs.exists(new Path(invalidFiles.get(0)))); - String actualFilesTableName = "actualFilesTable"; - spark.createDataset(allFiles, Encoders.STRING()).toDF("file_path").createOrReplaceTempView(actualFilesTableName); DeleteOrphanFiles.Result result3 = actions.deleteOrphanFiles(table) - .deleteWith(s -> { }) - .actualFilesTable(actualFilesTableName) - .execute(); - - Assert.assertEquals("Action should find 1 file", invalidFiles, result3.orphanFileLocations()); - Assert.assertTrue("Invalid file should be present", fs.exists(new Path(invalidFiles.get(0)))); - - DeleteOrphanFiles.Result result4 = actions.deleteOrphanFiles(table) .olderThan(System.currentTimeMillis()) .execute(); - Assert.assertEquals("Action should delete 1 file", invalidFiles, result4.orphanFileLocations()); + Assert.assertEquals("Action should delete 1 file", invalidFiles, result3.orphanFileLocations()); Assert.assertFalse("Invalid file should not be present", fs.exists(new Path(invalidFiles.get(0)))); List expectedRecords = Lists.newArrayList(); @@ -713,4 +705,120 @@ public void testGarbageCollectionDisabled() { ValidationException.class, "Cannot delete orphan files: GC is disabled", () -> SparkActions.get().deleteOrphanFiles(table).execute()); } + + @Test + public void testCompareToFileList() throws IOException, InterruptedException { + Table table = + TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation); + + List records = + Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA")); + + Dataset df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1); + + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation); + + Path dataPath = new Path(tableLocation + "/data"); + FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf()); + List validFiles = + Arrays.stream(fs.listStatus(dataPath, HiddenPathFilter.get())) + .filter(FileStatus::isFile) + .map( + file -> + new FilePathLastModifiedRecord( + file.getPath().toString(), new Timestamp(file.getModificationTime()))) + .collect(Collectors.toList()); + + Assert.assertEquals("Should be 2 valid files", 2, validFiles.size()); + + df.write().mode("append").parquet(tableLocation + "/data"); + + List allFiles = + Arrays.stream(fs.listStatus(dataPath, HiddenPathFilter.get())) + .filter(FileStatus::isFile) + .map( + file -> + new FilePathLastModifiedRecord( + file.getPath().toString(), new Timestamp(file.getModificationTime()))) + .collect(Collectors.toList()); + + Assert.assertEquals("Should be 3 files", 3, allFiles.size()); + + List invalidFiles = Lists.newArrayList(allFiles); + invalidFiles.removeAll(validFiles); + List invalidFilePaths = + invalidFiles.stream() + .map(FilePathLastModifiedRecord::getFilePath) + .collect(Collectors.toList()); + Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size()); + + // sleep for 1 second to ensure files will be old enough + Thread.sleep(1000); + + SparkActions actions = SparkActions.get(); + + Dataset compareToFileList = + spark + .createDataFrame(allFiles, FilePathLastModifiedRecord.class) + .withColumnRenamed("filePath", "file_path") + .withColumnRenamed("lastModified", "last_modified"); + + DeleteOrphanFiles.Result result1 = + ((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table)) + .compareToFileList(compareToFileList) + .deleteWith(s -> { }) + .execute(); + Assert.assertTrue( + "Default olderThan interval should be safe", + Iterables.isEmpty(result1.orphanFileLocations())); + + DeleteOrphanFiles.Result result2 = + ((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table)) + .compareToFileList(compareToFileList) + .olderThan(System.currentTimeMillis()) + .deleteWith(s -> { }) + .execute(); + Assert.assertEquals( + "Action should find 1 file", invalidFilePaths, result2.orphanFileLocations()); + Assert.assertTrue( + "Invalid file should be present", fs.exists(new Path(invalidFilePaths.get(0)))); + + DeleteOrphanFiles.Result result3 = + ((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table)) + .compareToFileList(compareToFileList) + .olderThan(System.currentTimeMillis()) + .execute(); + Assert.assertEquals( + "Action should delete 1 file", invalidFilePaths, result3.orphanFileLocations()); + Assert.assertFalse( + "Invalid file should not be present", fs.exists(new Path(invalidFilePaths.get(0)))); + + List expectedRecords = Lists.newArrayList(); + expectedRecords.addAll(records); + expectedRecords.addAll(records); + + Dataset resultDF = spark.read().format("iceberg").load(tableLocation); + List actualRecords = + resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList(); + Assert.assertEquals("Rows must match", expectedRecords, actualRecords); + + List outsideLocationMockFiles = + Lists.newArrayList(new FilePathLastModifiedRecord("/tmp/mock1", new Timestamp(0L))); + + Dataset compareToFileListWithOutsideLocation = + spark + .createDataFrame(outsideLocationMockFiles, FilePathLastModifiedRecord.class) + .withColumnRenamed("filePath", "file_path") + .withColumnRenamed("lastModified", "last_modified"); + + DeleteOrphanFiles.Result result4 = + ((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table)) + .compareToFileList(compareToFileListWithOutsideLocation) + .deleteWith(s -> { }) + .execute(); + Assert.assertEquals( + "Action should find nothing", Lists.newArrayList(), result4.orphanFileLocations()); + } } diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/FilePathLastModifiedRecord.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/FilePathLastModifiedRecord.java new file mode 100644 index 000000000000..275e3a520db5 --- /dev/null +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/source/FilePathLastModifiedRecord.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import java.sql.Timestamp; +import java.util.Objects; + +public class FilePathLastModifiedRecord { + private String filePath; + private Timestamp lastModified; + + public FilePathLastModifiedRecord() { + } + + public FilePathLastModifiedRecord(String filePath, Timestamp lastModified) { + this.filePath = filePath; + this.lastModified = lastModified; + } + + public String getFilePath() { + return filePath; + } + + public void setFilePath(String filePath) { + this.filePath = filePath; + } + + public Timestamp getLastModified() { + return lastModified; + } + + public void setLastModified(Timestamp lastModified) { + this.lastModified = lastModified; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + FilePathLastModifiedRecord that = (FilePathLastModifiedRecord) o; + return Objects.equals(filePath, that.filePath) && + Objects.equals(lastModified, that.lastModified); + } + + @Override + public int hashCode() { + return Objects.hash(filePath, lastModified); + } + + @Override + public String toString() { + return "FilePathLastModifiedRecord{" + + "filePath='" + filePath + '\'' + + ", lastModified='" + lastModified + '\'' + + '}'; + } +}