Spark: Update RewriteDataFilesSparkAction and RewritePositionDeleteFilesSparkAction to use the new APIs#12692
Conversation
f3dc0d9 to
84b780d
Compare
| assertThatThrownBy(() -> actions().rewriteDataFiles(table).binPack().sort()) | ||
| .isInstanceOf(IllegalArgumentException.class) | ||
| .hasMessage("Must use only one rewriter type (bin-pack, sort, zorder)"); | ||
| .hasMessage("Must use only one runner type (bin-pack, sort, zorder)"); |
There was a problem hiding this comment.
I'm not sure about this change.
The internal naming has changed, but not sure we have to expose this to the users
There was a problem hiding this comment.
I wouldn't go with the "runner" term. That's not really a concept in Spark, so I think this just exposes the internals in a confusing way.
There was a problem hiding this comment.
I decided to keep this error message intact.
fcf87ed to
5b5fc19
Compare
…lesSparkAction to use the new APIs
5b5fc19 to
0f40605
Compare
|
@danielcweeks, @manuzhang, @RussellSpitzer: I think this PR is ready for review now. Sorry for the force push. Instead of creating new files, I renamed the old ones so it is easier to review. This required me to force-push my changes. I will not do it again during the review 😄 |
| Preconditions.checkArgument( | ||
| rewriter == null, "Must use only one rewriter type (bin-pack, sort, zorder)"); | ||
| this.rewriter = new SparkBinPackDataRewriter(spark(), table); | ||
| runner == null, "Must use only one rewriter type (bin-pack, sort, zorder)"); |
There was a problem hiding this comment.
Follow up, We need to clean up these messages. They probably should say, "Rewriter type already set to %s" or something like that
There was a problem hiding this comment.
This is a second request to fix the error message, and it is not too complicated. So changed.
The solution is a bit lame:
Preconditions.checkArgument(
runner == null,
"Rewriter type already set to %s",
runner == null ? null : runner.description());
We need a second null check for the error message, or a null check around it 😢
Decided to hide this ugliness in a method.
If you have better ideas, feel free to comment
...k/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
...k/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
| import org.apache.spark.sql.SparkSession; | ||
|
|
||
| /** | ||
| * Common parent for data and positional delete rewrite runners. |
There was a problem hiding this comment.
Unclear description here, I think this is mean to essentially replace the meet of the Action itself? Like it's a container for planner + rewriter?
There was a problem hiding this comment.
The Action is using the 2 interfaces defined by the new API. Planner for grouping, Runner for executing the actual compaction.
This class is the base for the Spark based Runner implementations. Updated the javadoc
| import org.apache.iceberg.util.PropertyUtil; | ||
|
|
||
| /** | ||
| * Extends the {@link BinPackRewriteFilePlanner} with the possibility to set the expected |
There was a problem hiding this comment.
Needs more description here. Extends the BinPack rewriter for Rewriters that induce a distributed shuffle to reorganize data. (or something like this)
There was a problem hiding this comment.
Again, this is for the Planner. There was a specific configuration for the Shuffling rewriters which is modifying the plans. So I had to extend the core planners with the new configuration.
Alternatively we can move this functionality to the generic planner. In this case we don't need a specific class here. In this case the we have this config available for every planner (might be useful if there are compaction changes on any rewrite), but we have to reintroduce the flag shufflingPlanner (maybe with another name) to decide which Runner implementation to use.
There was a problem hiding this comment.
I was just talking about the Java Doc. I have no problem with the code organization but the key detail about this planner is that it produces plans for Shuffling Rewrites, not that it has an additional option.
There was a problem hiding this comment.
I thought this class adds an additional option of COMPRESSION_FACTOR used when calculating expectedOutputFiles (line 60). I guess shuffle/sort improves compression ratio and reduce file size.
There was a problem hiding this comment.
Thanks for all the suggestions.
Updated the javadoc based on your input.
|
I add this to 1.10.0, since we are targeting removing deprecated APIs in 1.10.0 |
...v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackFileRewriteRunner.java
Outdated
Show resolved
Hide resolved
| catalogName, tableIdent)) | ||
| .isInstanceOf(IllegalArgumentException.class) | ||
| .hasMessage("Must use only one rewriter type (bin-pack, sort, zorder)"); | ||
| .hasMessageStartingWith("Rewriter type already set to "); |
There was a problem hiding this comment.
I know this was not correct before but can we switch the error message here to "Cannot set rewriter, it has already been set to "
There was a problem hiding this comment.
Changed to Cannot set rewrite mode, it has already been set to . I think this is a bit better, but ok with reverting to your version if you don't agree.
...k/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java
Outdated
Show resolved
Hide resolved
stevenzwu
left a comment
There was a problem hiding this comment.
LGTM overall. added a couple minor comments
|
Merged to main. |
…lesSparkAction to use the new APIs (apache#12692) (apache#1578) Co-authored-by: pvary <peter.vary.apache@gmail.com>
No description provided.