Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,46 @@ public void testConcurrentRemoveOrphanFilesWithInvalidInput() {
() -> sql(
"CALL %s.system.remove_orphan_files(table => '%s', max_concurrent_deletes => %s)",
catalogName, tableIdent, -1));

String tempViewName = "file_list_test";
spark.emptyDataFrame().createOrReplaceTempView(tempViewName);

AssertHelpers.assertThrows(
"Should throw an error if file_list_view is missing required columns",
IllegalArgumentException.class,
"does not exist. Available:",
() ->
sql(
"CALL %s.system.remove_orphan_files(table => '%s', file_list_view => '%s')",
catalogName, tableIdent, tempViewName));

spark
.createDataset(Lists.newArrayList(), Encoders.tuple(Encoders.INT(), Encoders.TIMESTAMP()))
.toDF("file_path", "last_modified")
.createOrReplaceTempView(tempViewName);

AssertHelpers.assertThrows(
"Should throw an error if file_path has wrong type",
IllegalArgumentException.class,
"Invalid file_path column",
() ->
sql(
"CALL %s.system.remove_orphan_files(table => '%s', file_list_view => '%s')",
catalogName, tableIdent, tempViewName));

spark
.createDataset(Lists.newArrayList(), Encoders.tuple(Encoders.STRING(), Encoders.STRING()))
.toDF("file_path", "last_modified")
.createOrReplaceTempView(tempViewName);

AssertHelpers.assertThrows(
"Should throw an error if last_modified has wrong type",
IllegalArgumentException.class,
"Invalid last_modified column",
() ->
sql(
"CALL %s.system.remove_orphan_files(table => '%s', file_list_view => '%s')",
catalogName, tableIdent, tempViewName));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.File;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
Expand All @@ -38,6 +39,7 @@
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HiddenPathFilter;
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.JobGroupInfo;
import org.apache.iceberg.util.PropertyUtil;
Expand All @@ -53,6 +55,8 @@
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.util.SerializableConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -69,7 +73,12 @@
* removes unreachable files that are older than 3 days using {@link Table#io()}. The behavior can be modified
* by passing a custom location to {@link #location} and a custom timestamp to {@link #olderThan(long)}.
* For example, someone might point this action to the data folder to clean up only orphan data files.
* In addition, there is a way to configure an alternative delete method via {@link #deleteWith(Consumer)}.
* <p>
* Configure an alternative delete method using {@link #deleteWith(Consumer)}.
* <p>
* For full control of the set of files being evaluated, use the {@link #compareToFileList(Dataset)} argument. This
* skips the directory listing - any files in the dataset provided which are not found in table metadata will
* be deleted, using the same {@link Table#location()} and {@link #olderThan(long)} filtering as above.
* <p>
* <em>Note:</em> It is dangerous to call this action with a short retention interval as it might corrupt
* the state of the table if another operation is writing at the same time.
Expand Down Expand Up @@ -99,6 +108,7 @@ public void accept(String file) {

private String location = null;
private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3);
private Dataset<Row> compareToFileList;
private Consumer<String> deleteFunc = defaultDelete;
private ExecutorService deleteExecutorService = null;

Expand Down Expand Up @@ -144,6 +154,37 @@ public BaseDeleteOrphanFilesSparkAction deleteWith(Consumer<String> newDeleteFun
return this;
}

public BaseDeleteOrphanFilesSparkAction compareToFileList(Dataset<Row> files) {
StructType schema = files.schema();

StructField filePathField = schema.apply(FILE_PATH);
Preconditions.checkArgument(
filePathField.dataType() == DataTypes.StringType,
"Invalid %s column: %s is not a string",
FILE_PATH,
filePathField.dataType());

StructField lastModifiedField = schema.apply(LAST_MODIFIED);
Preconditions.checkArgument(
lastModifiedField.dataType() == DataTypes.TimestampType,
"Invalid %s column: %s is not a timestamp",
LAST_MODIFIED,
lastModifiedField.dataType());

this.compareToFileList = files;
return this;
}

private Dataset<Row> filteredCompareToFileList() {
Dataset<Row> files = compareToFileList;
if (location != null) {
files = files.filter(files.col(FILE_PATH).startsWith(location));
}
Comment on lines +180 to +182
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi -- re: #4503 (comment) does this startsWith location filtering seem reasonable to you for a short-term solution? I know there's some discussion about making some larger changes here.

return files
.filter(files.col(LAST_MODIFIED).lt(new Timestamp(olderThanTimestamp)))
.select(files.col(FILE_PATH));
}

@Override
public DeleteOrphanFiles.Result execute() {
JobGroupInfo info = newJobGroupInfo("DELETE-ORPHAN-FILES", jobDesc());
Expand All @@ -163,7 +204,7 @@ private DeleteOrphanFiles.Result doExecute() {
Dataset<Row> validContentFileDF = buildValidContentFileDF(table);
Dataset<Row> validMetadataFileDF = buildValidMetadataFileDF(table);
Dataset<Row> validFileDF = validContentFileDF.union(validMetadataFileDF);
Dataset<Row> actualFileDF = buildActualFileDF();
Dataset<Row> actualFileDF = compareToFileList == null ? buildActualFileDF() : filteredCompareToFileList();

Column actualFileName = filenameUDF.apply(actualFileDF.col(FILE_PATH));
Column validFileName = filenameUDF.apply(validFileDF.col(FILE_PATH));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ abstract class BaseSparkAction<ThisT, R> implements Action<ThisT, R> {

protected static final String FILE_PATH = "file_path";
protected static final String FILE_TYPE = "file_type";
protected static final String LAST_MODIFIED = "last_modified";

private static final AtomicInteger JOB_COUNTER = new AtomicInteger();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iceberg.actions.DeleteOrphanFiles;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.actions.BaseDeleteOrphanFilesSparkAction;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.util.DateTimeUtil;
Expand All @@ -49,7 +50,8 @@ public class RemoveOrphanFilesProcedure extends BaseProcedure {
ProcedureParameter.optional("older_than", DataTypes.TimestampType),
ProcedureParameter.optional("location", DataTypes.StringType),
ProcedureParameter.optional("dry_run", DataTypes.BooleanType),
ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType)
ProcedureParameter.optional("max_concurrent_deletes", DataTypes.IntegerType),
ProcedureParameter.optional("file_list_view", DataTypes.StringType)
};

private static final StructType OUTPUT_TYPE = new StructType(new StructField[]{
Expand Down Expand Up @@ -80,12 +82,14 @@ public StructType outputType() {
}

@Override
@SuppressWarnings("checkstyle:CyclomaticComplexity")
public InternalRow[] call(InternalRow args) {
Identifier tableIdent = toIdentifier(args.getString(0), PARAMETERS[0].name());
Long olderThanMillis = args.isNullAt(1) ? null : DateTimeUtil.microsToMillis(args.getLong(1));
String location = args.isNullAt(2) ? null : args.getString(2);
boolean dryRun = args.isNullAt(3) ? false : args.getBoolean(3);
Integer maxConcurrentDeletes = args.isNullAt(4) ? null : args.getInt(4);
String fileListView = args.isNullAt(5) ? null : args.getString(5);

Preconditions.checkArgument(maxConcurrentDeletes == null || maxConcurrentDeletes > 0,
"max_concurrent_deletes should have value > 0, value: " + maxConcurrentDeletes);
Expand Down Expand Up @@ -113,6 +117,10 @@ public InternalRow[] call(InternalRow args) {
action.executeDeleteWith(executorService(maxConcurrentDeletes, "remove-orphans"));
}

if (fileListView != null) {
((BaseDeleteOrphanFilesSparkAction) action).compareToFileList(spark().table(fileListView));
}

DeleteOrphanFiles.Result result = action.execute();

return toOutputRows(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.io.File;
import java.io.IOException;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -53,6 +54,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.source.FilePathLastModifiedRecord;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
Expand Down Expand Up @@ -703,4 +705,120 @@ public void testGarbageCollectionDisabled() {
ValidationException.class, "Cannot delete orphan files: GC is disabled",
() -> SparkActions.get().deleteOrphanFiles(table).execute());
}

@Test
public void testCompareToFileList() throws IOException, InterruptedException {
Table table =
TABLES.create(SCHEMA, PartitionSpec.unpartitioned(), Maps.newHashMap(), tableLocation);

List<ThreeColumnRecord> records =
Lists.newArrayList(new ThreeColumnRecord(1, "AAAAAAAAAA", "AAAA"));

Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class).coalesce(1);

df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation);

df.select("c1", "c2", "c3").write().format("iceberg").mode("append").save(tableLocation);

Path dataPath = new Path(tableLocation + "/data");
FileSystem fs = dataPath.getFileSystem(spark.sessionState().newHadoopConf());
List<FilePathLastModifiedRecord> validFiles =
Arrays.stream(fs.listStatus(dataPath, HiddenPathFilter.get()))
.filter(FileStatus::isFile)
.map(
file ->
new FilePathLastModifiedRecord(
file.getPath().toString(), new Timestamp(file.getModificationTime())))
.collect(Collectors.toList());

Assert.assertEquals("Should be 2 valid files", 2, validFiles.size());

df.write().mode("append").parquet(tableLocation + "/data");

List<FilePathLastModifiedRecord> allFiles =
Arrays.stream(fs.listStatus(dataPath, HiddenPathFilter.get()))
.filter(FileStatus::isFile)
.map(
file ->
new FilePathLastModifiedRecord(
file.getPath().toString(), new Timestamp(file.getModificationTime())))
.collect(Collectors.toList());

Assert.assertEquals("Should be 3 files", 3, allFiles.size());

List<FilePathLastModifiedRecord> invalidFiles = Lists.newArrayList(allFiles);
invalidFiles.removeAll(validFiles);
List<String> invalidFilePaths =
invalidFiles.stream()
.map(FilePathLastModifiedRecord::getFilePath)
.collect(Collectors.toList());
Assert.assertEquals("Should be 1 invalid file", 1, invalidFiles.size());

// sleep for 1 second to ensure files will be old enough
Thread.sleep(1000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use waitUntilAfter rather than sleeping.


SparkActions actions = SparkActions.get();

Dataset<Row> compareToFileList =
spark
.createDataFrame(allFiles, FilePathLastModifiedRecord.class)
.withColumnRenamed("filePath", "file_path")
.withColumnRenamed("lastModified", "last_modified");

DeleteOrphanFiles.Result result1 =
((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table))
.compareToFileList(compareToFileList)
.deleteWith(s -> { })
.execute();
Assert.assertTrue(
"Default olderThan interval should be safe",
Iterables.isEmpty(result1.orphanFileLocations()));

DeleteOrphanFiles.Result result2 =
((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table))
.compareToFileList(compareToFileList)
.olderThan(System.currentTimeMillis())
.deleteWith(s -> { })
.execute();
Assert.assertEquals(
"Action should find 1 file", invalidFilePaths, result2.orphanFileLocations());
Assert.assertTrue(
"Invalid file should be present", fs.exists(new Path(invalidFilePaths.get(0))));

DeleteOrphanFiles.Result result3 =
((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table))
.compareToFileList(compareToFileList)
.olderThan(System.currentTimeMillis())
.execute();
Assert.assertEquals(
"Action should delete 1 file", invalidFilePaths, result3.orphanFileLocations());
Assert.assertFalse(
"Invalid file should not be present", fs.exists(new Path(invalidFilePaths.get(0))));

List<ThreeColumnRecord> expectedRecords = Lists.newArrayList();
expectedRecords.addAll(records);
expectedRecords.addAll(records);

Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
List<ThreeColumnRecord> actualRecords =
resultDF.as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
Assert.assertEquals("Rows must match", expectedRecords, actualRecords);

List<FilePathLastModifiedRecord> outsideLocationMockFiles =
Lists.newArrayList(new FilePathLastModifiedRecord("/tmp/mock1", new Timestamp(0L)));

Dataset<Row> compareToFileListWithOutsideLocation =
spark
.createDataFrame(outsideLocationMockFiles, FilePathLastModifiedRecord.class)
.withColumnRenamed("filePath", "file_path")
.withColumnRenamed("lastModified", "last_modified");

DeleteOrphanFiles.Result result4 =
((BaseDeleteOrphanFilesSparkAction) actions.deleteOrphanFiles(table))
.compareToFileList(compareToFileListWithOutsideLocation)
.deleteWith(s -> { })
.execute();
Assert.assertEquals(
"Action should find nothing", Lists.newArrayList(), result4.orphanFileLocations());
}
}
Loading