-
Notifications
You must be signed in to change notification settings - Fork 3k
Core, Spark 3.3: Add FileRewriter API #7175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
35d951f to
1333bc7
Compare
| * @param <T> the Java type of tasks to read content files | ||
| * @param <F> the Java type of content files | ||
| */ | ||
| public interface FileRewriter<T extends ContentScanTask<F>, F extends ContentFile<F>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created a separate hierarchy because I did not follow the exact API.
- I replaced
RewriteStrategy options(map)withvoid init(map). In order to support proper method chaining with inheritance, we need to parametrize the strategy and rewriter withThisT, like we do inScan, for instance. That was not done and method chaining was partially broken in existing strategies. To simplify the API, I went for the approach we use in catalogs andFileIOand addedvoid init(map)instead. - I did not make the new interface serializable as our strategies are not used in a distributed fashion. They can’t be serialized in their current form too (they have non-serializable fields). If we want to support that functionality in the future, we may add it later. We should either mark rewriters serializable and support that or don’t do that.
| long defaultMax = (long) (target * MAX_FILE_SIZE_DEFAULT_RATIO); | ||
| long max = propertyAsLong(options, MAX_FILE_SIZE_BYTES, defaultMax); | ||
|
|
||
| checkArgument(target > 0, "'%s' is set to %s but must be > 0", TARGET_FILE_SIZE_BYTES, target); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I switched negation in error messages. I believe it is easier to understand an error message that states what the value should look like, instead of what it can’t be.
| } | ||
|
|
||
| @Override | ||
| public Set<DataFile> rewrite(List<FileScanTask> group) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this class as the lifecycle of the rewrite is the same in all 3 compaction strategies.
I put all common logic here.
| * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link FileRewriter} instead. | ||
| */ | ||
| @Deprecated | ||
| public interface RewriteStrategy extends Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I decided not to migrate our existing Spark strategies as they were public. Also, we have more information about all types of rewrites so we can structure code a bit differently.
1333bc7 to
809dd9b
Compare
|
This is ready for review, I am still working on tests. |
809dd9b to
7c1d222
Compare
szehon-ho
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks mostly good to me, have some small comments.
| * Returns a set of supported options for this rewriter. This is an allowed-list and any options | ||
| * not specified here will be rejected at runtime. | ||
| * | ||
| * @return returns a set of supported options |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: extra "returns"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is return annotation redundant? (comparing with other javadoc comments)
| return Iterables.filter(tasks, task -> hasSuboptimalSize(task) || hasTooManyDeletes(task)); | ||
| } | ||
|
|
||
| private boolean hasTooManyDeletes(FileScanTask task) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove 'has' to just 'tooManyDeletes' ?
| public static final String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes"; | ||
|
|
||
| /** | ||
| * Adjusts files which will be considered for rewriting. Files smaller than this value will be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The word 'adjusts' seems strange here. (file is not changed?)
Also 'functions independently' seems not clear. Can we clarify, ex :
Any file with size under this threshold will be re-written, regardless of ...
Also, one thought, as here we mention regardless of "MAX_FILE_SIZE_BYTES". Does it make sense to just say "regardless of any other criteria", as there is also the question of whether we need to check tooManyDeletes as well.
| public static final double MIN_FILE_SIZE_DEFAULT_RATIO = 0.75; | ||
|
|
||
| /** | ||
| * Adjusts files which will be considered for rewriting. Files larger than this value will be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same comment as above
| * groups. This option controls the largest amount of data that should be rewritten in a single | ||
| * group. It helps with breaking down the rewriting of very large partitions which may not be | ||
| * rewritable otherwise due to the resource constraints of the cluster. For example, a sort-based | ||
| * rewrite may not scale to TB sized partitions, those partitions need to be worked on in small |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: missing and
TB-sized partitions, and those partitions
| * rewrite may not scale to TB sized partitions, those partitions need to be worked on in small | ||
| * subsections to avoid exhaustion of resources. | ||
| * | ||
| * <p>When grouping files, the file rewriter will use this value to limit the files which will be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same, I feel this context is more useful in class level.
| /** | ||
| * Estimates a larger max target file size than the target size used in task creation to avoid | ||
| * tasks which are predicted to have a certain size, but exceed that target size when serde is | ||
| * complete creating tiny remainder files. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hard to read, a comma may help:
"when serde is complete, creating tiny remainder files"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also I realize this explanation is just repeated on the below paragraph. Can't this be simpler and just be:
"Estimates a larger max target file size than the target size used in task creation to avoid creating tiny remainder files."
| * that the actual data will end up being larger than our target size due to various factors of | ||
| * compression, serialization and other factors outside our control. If this occurs, instead of | ||
| * making a single file that is close in size to our target, we would end up producing one file of | ||
| * the target size, and then a small extra file with the remaining data. For example, if our |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Suggest to put "For example" on new paragraph
| * making a single file that is close in size to our target, we would end up producing one file of | ||
| * the target size, and then a small extra file with the remaining data. For example, if our | ||
| * target is 512 MB, we may generate a rewrite task that should be 500 MB. When we write the data | ||
| * we may find we actually have to write out 530 MB. If we use the target size while writing we |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: comma before we
"while writing, we..."
|
I added tests and addressed some comments, I'll address the rest by the end of today. |
| return group.size() > 1 && group.size() >= minInputFiles; | ||
| } | ||
|
|
||
| protected boolean enoughData(List<T> group) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should name be more generic (data => rows), so no confusion with Data/Delete files?
| /** | ||
| * A class for rewriting content files. | ||
| * | ||
| * <p>The entire rewrite operation is broken down into pieces based on partitioning, and size-based |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szehon-ho, moved some of those detailed comments here instead of per options.
| public static final String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes"; | ||
|
|
||
| /** | ||
| * Controls which files will be considered for rewriting. Files with sizes under this threshold |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@szehon-ho, changed docs for many of these options too. Another look would be appreciated!
|
@szehon-ho, this one is ready for another round. |
szehon-ho
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great to me, thanks for the changes!
|
Thanks a lot for reviewing, @szehon-ho! |
| int value = | ||
| PropertyUtil.propertyAsInt(options, DELETE_FILE_THRESHOLD, DELETE_FILE_THRESHOLD_DEFAULT); | ||
| Preconditions.checkArgument( | ||
| value >= 0, "'%s' is set to %s but must be >= 0", DELETE_FILE_THRESHOLD, value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aokolnychyi Why do we allow the delete-file-threshold to be 0 here? Is there a meaningful use case?
This PR refactors our compaction code so that it can be used for position delete file rewrites.
In particular, the following interfaces/classes have been added.
FileRewriter- a generic API for rewriting content filesSizeBasedFileRewriter- a common rewriter for content files primary based on file sizeSizeBasedDataRewriter- a common data rewriterSparkSizeBasedDataRewriter- a Spark data rewriter that stages tasks and uses a commit coordinatorSparkBinPackDataRewriter- a Spark data rewriter that uses bin-packingSparkSortDataRewriter- a Spark data rewriter that shuffles and sorts dataSparkZOrderDataRewriter- a Spark data rewriter that shuffles and zorders dataThe new API has the same behavior as the existing rewrite strategies.