-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark: Implement an action to remove orphan files #894
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| } | ||
|
|
||
| @Test | ||
| public void testAllValidFilesAreKept() throws IOException, InterruptedException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also add a test for Write-Audit-Publish (WAP) workflow case where a snapshot can be staged (using the cherrypicking operation), where it's not part of the list of active snapshots. So expected behavior should be that this action should not delete those staged files as orphan files.
There are tests in TestWapWorkflow that illustrate this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea, will add a case for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree it would be nice to have a test for it. This should work because the metadata tables used return all files reachable by metadata, not just the ones in a single snapshot. We use the same table for a similar check in our environment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is one case when all_data_files won't report WAP files: when there is no snapshot.
@Override
public CloseableIterable<FileScanTask> planFiles() {
Snapshot snapshot = snapshot();
if (snapshot != null) {
LOG.info("Scanning table {} snapshot {} created at {} with filter {}", table,
snapshot.snapshotId(), formatTimestampMillis(snapshot.timestampMillis()),
rowFilter);
Listeners.notifyAll(
new ScanEvent(table.toString(), snapshot.snapshotId(), rowFilter, schema()));
return planFiles(ops, snapshot, rowFilter, caseSensitive, colStats);
} else {
LOG.info("Scanning empty table {}", table);
return CloseableIterable.empty();
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, table.currentSnapshot().manifestListLocation() in location()can lead to a NPE.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as we have a current snapshot, it does work correctly. I've added a test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought we addressed the metadata table problem when there is no current snapshot in #801. I'll check to see why that doesn't work. My initial guess is that PR refers to static tables and this is a parallel table.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's just individual tables that were fixed. The solution is to override planFiles:
@Override
public CloseableIterable<FileScanTask> planFiles() {
// override planFiles to avoid the check for a current snapshot because this metadata table is for all snapshots
return CloseableIterable.withNoopClose(HistoryTable.this.task(this));
}
spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesActionResult.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java
Outdated
Show resolved
Hide resolved
| this.dataLocation = table.locationProvider().dataLocation(); | ||
| } | ||
|
|
||
| public RemoveOrphanFilesAction allDataFilesTable(String newAllDataFilesTable) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems awkward, but I'm not sure a better way to do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any other ideas are more than welcome :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other places in Spark, we detect path tables by checking contains("/"). We could do that here to construct the metadata table names:
public String metadataTableName(Table table, String metaTable) {
String tableName = table.toString()
if (tableName.contains("/")) {
return tableName + "#" + metaTable;
} else {
return tableName + "." + metaTable;
}
}
I think that convention for naming isn't unreasonable considering how we do it by default for Spark.
We could also allow passing in BiFunction<String, String, String> metadataTableName that we just default to the implementation above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This almost works. Unfortunately, table.toString() might return anything. For example, it will prepend hive. for friendly names in the Hive catalog and we won't be able to resolve hive.db.table.all_data_files. Exposing a correct TableIdentifier in Table would help but that would mean modifying public BaseTable as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, I thought this would work because it is what we do in our Spark 2.4 build. Since we are using DSv2 catalogs, when the catalog adds its name to the identifier we actually get a working multi-catalog identifier.
Maybe we should add something to remove hive. for now, and take it out for Spark 3.0.
spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java
Outdated
Show resolved
Hide resolved
| TestIcebergSourceHiveTables.currentIdentifier.name()); | ||
| return null; | ||
| }); | ||
| public void dropTable() throws IOException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have to clean the location properly.
spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java
Outdated
Show resolved
Hide resolved
|
Thanks @aokolnychyi for the PR. Based on what I see on PR, you are trying to clean up files that are not referenced by |
|
@mehtaashish23, this action should remove all orphan files including those that we failed to delete while expiring snapshots. The |
|
|
||
| package org.apache.iceberg; | ||
|
|
||
| public interface Action<R> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we create an actions package?
| } | ||
| } | ||
|
|
||
| otherMetadataFiles.add(ops.metadataFileLocation("version-hint.text")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch!
| List<String> matchingTopLevelFiles = Lists.newArrayList(); | ||
|
|
||
| try { | ||
| Path path = new Path(location); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that this is the table's location, we expect this to contain just two directories: data and metadata. I think the intent of this methods was to parallelize on the first level of partition directories, but that's not what will happen here.
It's a bit more tricky because we don't know the convention actually matches the default structure, but I think it would be reasonable to traverse the first 2 layers of directories to build the top-level set. To do that, adding a depth parameter to the recursive traversal makes sense so you can use it here and return after 2 levels (or a configurable number). That would also be a good thing for the parallel traversal to ensure this won't get caught in a symlink loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is one more problem: the initial number of locations might be pretty small. It seems beneficial to list, for example, 3 levels on the driver and then parallelize. The number of top-level partitions might be small. At the same time, we should avoid listing too much on the driver if the data is written to the root table location. That's why I modified the listing logic so that we list 3 levels by default by don't list locations that have more than 10 sub-locations. The latter ones will be listed in a distributed manner. This should cover cases with a lot of top-level and leaf partitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, ignore my previous comment. I'll think more about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's fine to list a fixed-number of levels. If all the data is written to the root location, there's nothing we can do anyway because it can't be parallelized.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking about this more, I think it still makes sense to list 2 levels and stop whenever we hit let's say 10 sub-locations. It won't solve the problem when the number of top-level partitions is small. However, it should help if the table is partitioned but the data is written to the table location. Consider tables that were migrated from Hive. If they have 1000 top-level partitions and 10 sub-partitions, we will be listing 10000 locations on the driver only for the first two levels.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue, what do you think?
|
|
||
| return (FlatMapFunction<Iterator<String>, String>) dirs -> { | ||
| List<String> files = Lists.newArrayList(); | ||
| Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: predicate isn't a very descriptive name. Maybe pastOperationTimeLimit instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I kept it to stay on one line below.
spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java
Outdated
Show resolved
Hide resolved
spark/src/main/java/org/apache/iceberg/RemoveOrphanFilesAction.java
Outdated
Show resolved
Hide resolved
|
@aokolnychyi, this looks almost ready to me. The parallel file listing looks like it needs to be updated, and we need javadoc. Otherwise I think the other points can be done as follow-ups. |
| return manifestDF.union(otherMetadataFileDF); | ||
| } | ||
|
|
||
| private Dataset<Row> buildActualFileDF() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a comment here on criteria for collecting all actual files would be helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@prodeezy, do you mean the actual algorithm or that we select files older than a given timestamp?
|
|
||
| private String location = null; | ||
| private long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.DAYS.toMillis(3); | ||
| private Consumer<String> deleteFunc = new Consumer<String>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we should be able to write this a table.io()::deleteFile
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's what I tried in the first place. Unfortunately, it complains with:
Variable 'table' might not have been initialized
I think we had the same problem in RemoveSnapshots.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense. Let's go with this then.
|
|
||
| Predicate<FileStatus> predicate = file -> file.getModificationTime() < olderThanTimestamp; | ||
|
|
||
| int maxDepth = Integer.MAX_VALUE; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems excessive, but not really that dangerous. When listing in executors, the purpose is to exit even if there is a reference cycle in the file system. This would technically do that, but would recurse 2 billion levels so the more likely failure is a stack overflow.
That's alright since it's the behavior that was here before, but I think it would be better to set this to 2,000 or something large but reasonable and then throw an exception if there are remaining directories when it returns.
|
I'm merging this since it's large and the remaining comments are minor. That avoids needing to re-read the whole commit for small updates. Thanks for adding this, @aokolnychyi! I think it is going to be really useful. |
…he#894) * Core: Serialize statistics files in TableMetadata (apache#5799) (cherry picked from commit d1befd9) * Add DR support of V2 tables without delete date files Co-authored-by: Piotr Findeisen <piotr.findeisen@gmail.com>
* Internal: DR actions * Internal: Add DR support of V2 tables without delete date files (apache#894)
This PR adds a Spark action that removes orphan data and metadata files that can be left in some edge cases like executor preemption.