[GLUTEN-9034][VL] Add VeloxResizeBatchesExec for Shuffle#9035
[GLUTEN-9034][VL] Add VeloxResizeBatchesExec for Shuffle#9035jackylee-ch merged 9 commits intoapache:mainfrom
Conversation
|
Can you provide the complete dag diagram? Maybe it can be solved by adjusting the number of input partitions, such as maxPartitionSize? |
@jackylee-ch In some cases, it can be controled by |
@jackylee-ch yeah, agree with that. I'll update it later |
|
Run Gluten Clickhouse CI on x86 |
70d695b to
25afa8e
Compare
|
Run Gluten Clickhouse CI on x86 |
25afa8e to
2991053
Compare
|
Run Gluten Clickhouse CI on x86 |
1 similar comment
|
Run Gluten Clickhouse CI on x86 |
497d69b to
438a101
Compare
|
Run Gluten Clickhouse CI on x86 |
1 similar comment
|
Run Gluten Clickhouse CI on x86 |
@jackylee-ch @jinchengchenghh @zhztheplayer updated, and also update the title and description. please take a look when you are convenient |
|
Run Gluten Clickhouse CI on x86 |
1 similar comment
|
Run Gluten Clickhouse CI on x86 |
|
@WangGuangxin Any update on this patch? Could you do a rebase? Thanks! |
thinks, will rebase today |
cf4ff8b to
6b21ba9
Compare
|
Run Gluten Clickhouse CI on x86 |
|
Run Gluten Clickhouse CI on x86 |
| .intConf | ||
| .createOptional | ||
|
|
||
| val COLUMNAR_VELOX_RESIZE_BATCHES_SHUFFLE_INPUT_OUTPUT_MIN_SIZE = |
There was a problem hiding this comment.
s/SHUFFLE_INPUT_OUTPUT/SHUFFLE_OUTPUT ?
There was a problem hiding this comment.
This is used both for the BatchResizing before shuffle input and after shuffle output, so that we can reduce some config. Usually there is need to do too much customized min size config for these two scenario. What do you think?
| } | ||
|
|
||
| def veloxResizeBatchesShuffleInputRange: ResizeRange = { | ||
| def veloxResizeBatchesShuffleInputOutputRange: ResizeRange = { |
| import org.apache.spark.sql.execution.{ColumnarShuffleExchangeExec, SparkPlan} | ||
| import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, ShuffleQueryStageExec} | ||
|
|
||
| case class AppendBatchResizeAfterShuffleRead() extends Rule[SparkPlan] { |
There was a problem hiding this comment.
We don't have a rule for the resizing on the input side of shuffle. Can we make the ways of optimizations more consistent? Either both via rules, or both not?
There was a problem hiding this comment.
At first, resizing on the output side of shuffle followings the way for shuffle read, that's do it when converting to transformer. But after the DummpyLeafExec is introduced, it doesn't work.
So I'll refactor the way to add resizing on the input side of shuffle, to make it enabled by rule
There was a problem hiding this comment.
Fair enough. Let's keep it as an individual rule.
|
@jackylee-ch @zhztheplayer Please take another look. |
|
@jackylee-ch @zhztheplayer Do you have any further comments? |
| def maybeAddAppendBatchesExec(plan: SparkPlan): SparkPlan = { | ||
| plan match { | ||
| case shuffle: ColumnarShuffleExchangeExec | ||
| if !shuffle.useSortBasedShuffle && | ||
| VeloxConfig.get.veloxResizeBatchesShuffleInput => | ||
| val range = VeloxConfig.get.veloxResizeBatchesShuffleInputRange | ||
| val appendBatches = | ||
| VeloxResizeBatchesExec(shuffle.child, range.min, range.max) | ||
| shuffle.withNewChildren(Seq(appendBatches)) | ||
| case _ => plan | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Thanks for factoring this out!
|
@jackylee-ch @zhztheplayer Can we merge this? |
jackylee-ch
left a comment
There was a problem hiding this comment.
Sorry for late response. Great work!
| val range = VeloxConfig.get.veloxResizeBatchesShuffleInputOutputRange | ||
| plan.transformUp { | ||
| case shuffle: ColumnarShuffleExchangeExec | ||
| if !shuffle.useSortBasedShuffle && |
There was a problem hiding this comment.
it looks like this will be only enabled on hash based shuffle?
Cc @marin-ma
There was a problem hiding this comment.
Yes. We don't need to resize input batches for sort-based shuffle.
|
Do you notice in some queries, the plan cannot be fully replaced? Like TPCDS Q95, there are 4 AQEShuffleRead, but only one add the VeloxResizeBatch node. @WangGuangxin |
|
Fixed in #11069 @WangGuangxin @jackylee-ch |





What changes were proposed in this pull request?
Shuffle read may generate small batch with few rows, which may hurt performance a lot.
A example in our production case is

So in this PR proposed to add
VeloxResizeBatchesExecright after shuffle read, the plan is changed toAs we can see, the average batch size increased from 9 to 1000, and the total hour reduced from 2066h to 766h.
(Fixes: #9034)
How was this patch tested?
manually and UT