Skip to content

Core: FileRewritePlanner implementation#12493

Merged
pvary merged 4 commits intoapache:mainfrom
pvary:rewrite_new_api_impl
Mar 29, 2025
Merged

Core: FileRewritePlanner implementation#12493
pvary merged 4 commits intoapache:mainfrom
pvary:rewrite_new_api_impl

Conversation

@pvary
Copy link
Contributor

@pvary pvary commented Mar 10, 2025

This PR creates the core implementations for the Planners defined by #12306.

The PR deprecates the old *Rewriter implementations. Adds test for the Planners.

This is part of the #11513 refactor

@github-actions github-actions bot added the core label Mar 10, 2025
@pvary pvary force-pushed the rewrite_new_api_impl branch from 4d2f52b to c3c8cd8 Compare March 10, 2025 16:03
RewriteJobOrder.fromName(
PropertyUtil.propertyAsString(
options,
RewriteDataFiles.REWRITE_JOB_ORDER,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do these properties belong here now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The plan currently returns the jobs in order in a CloseableIterable.
We can push the ordering to the user, but this seems something we can reuse between Spark and Flink, so I decided to keep it here.

Your thoughts?

@pvary
Copy link
Contributor Author

pvary commented Mar 26, 2025

Thanks @RussellSpitzer for the review!
I have tried to answer and address your questions!

* These will be grouped by partitions based on their size using fix sized bins. Extends the {@link
* SizeBasedFileRewritePlanner} with {@link RewritePositionDeleteFiles#REWRITE_JOB_ORDER} handling.
*/
public class RewritePositionDeletesGroupPlanner
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably RewritePositionDeletesPlanner is more accurate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to BinPackRewritePositionDeletePlanner

*
* <p>Defaults to Integer.MAX_VALUE, which means this feature is not enabled by default.
*/
public static final String DELETE_FILE_THRESHOLD = "delete-file-threshold";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is refactored from existing code. this config name doesn't seem the most intuitive. delete-file-threshold could create an impression of delete file size threshold (in bytes). I think it meant deleted-rows-threshold. But it is probably too late to change it for compatibility reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I agree with you, I also agree with you that we don't want to touch this in this PR (and probably not even later)

PositionDeletesScanTask.class::cast);
}

private StructLikeMap<List<PositionDeletesScanTask>> groupByPartition(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some of the methods in this class look similar to the code in the BinPackRewriteFilePlanner. can the code be pushed to the base class? e.g. plan(), most of planFileGroups, etc.

In this specific method, I guess PartitionUtil.coercePartition is probably different

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the same feeling, and I tried it several times, but there are nuances which makes them different, and hard to generalize:

  • The input tasks are different
  • The partition handling is different (data file compaction handles partition evolution)
  • The file filtering logic is different
  • The group filterig logic is different
  • Result groups are different

I see the partition handling as the main issue.
When compacting data files we have this additional logic:

      // If a task uses an incompatible partition spec the data inside could contain values
      // which belong to multiple partitions in the current spec. Treating all such files as
      // un-partitioned and grouping them together helps to minimize new files made.

When compacting position delete files just uses coercePartition to update the task partition to the current one.

Based on these, I decided against the generalization.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sounds good.

Copy link
Contributor

@stevenzwu stevenzwu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@RussellSpitzer
Copy link
Member

I haven't had time to check the rest of this, but I trust @stevenzwu 's review so please go on without me.

@pvary pvary merged commit d597142 into apache:main Mar 29, 2025
42 checks passed
@pvary
Copy link
Contributor Author

pvary commented Mar 29, 2025

Merged to main.
Thanks for the review @stevenzwu and @RussellSpitzer!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants