From 6c153f475047f41e0158f1eb5b596b5f9683305b Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Tue, 11 Nov 2025 15:25:48 +0000 Subject: [PATCH 1/2] fix resize batch --- ...dBatchResizeForShuffleInputAndOutput.scala | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala index 7d6309bf93f3..f72f8430697b 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala @@ -22,6 +22,7 @@ import org.apache.gluten.execution.VeloxResizeBatchesExec import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{ColumnarShuffleExchangeExec, SparkPlan} import org.apache.spark.sql.execution.adaptive.{AQEShuffleReadExec, ShuffleQueryStageExec} +import org.apache.spark.sql.execution.exchange.ReusedExchangeExec /** * Try to append [[VeloxResizeBatchesExec]] for shuffle input and output to make the batch sizes in @@ -44,24 +45,36 @@ case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan] { VeloxResizeBatchesExec(shuffle.child, range.min, range.max) shuffle.withNewChildren(Seq(appendBatches)) case a @ AQEShuffleReadExec( - ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec, _), + ShuffleQueryStageExec( + _, + shuffle: ColumnarShuffleExchangeExec | + ReusedExchangeExec(_, _: ColumnarShuffleExchangeExec), + _), _) if resizeBatchesShuffleOutputEnabled && shuffle.shuffleWriterType.requiresResizingShuffleOutput => VeloxResizeBatchesExec(a, range.min, range.max) - // Since it's transformed in a bottom to up order, so we may first encountered + // Since it's transformed in a bottom to up order, so we may first encounter // ShuffeQueryStageExec, which is transformed to VeloxResizeBatchesExec(ShuffeQueryStageExec), // then we see AQEShuffleReadExec case a @ AQEShuffleReadExec( VeloxResizeBatchesExec( - s @ ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec, _), + s @ ShuffleQueryStageExec( + _, + shuffle: ColumnarShuffleExchangeExec | + ReusedExchangeExec(_, _: ColumnarShuffleExchangeExec), + _), _, _), _) if resizeBatchesShuffleOutputEnabled && shuffle.shuffleWriterType.requiresResizingShuffleOutput => VeloxResizeBatchesExec(a.copy(child = s), range.min, range.max) - case s @ ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec, _) + case s @ ShuffleQueryStageExec( + _, + shuffle: ColumnarShuffleExchangeExec | + ReusedExchangeExec(_, _: ColumnarShuffleExchangeExec), + _) if resizeBatchesShuffleOutputEnabled && shuffle.shuffleWriterType.requiresResizingShuffleOutput => VeloxResizeBatchesExec(s, range.min, range.max) From c3ea64c04616a0a8425b4421e084345b3c51e6b6 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Wed, 12 Nov 2025 09:46:53 +0000 Subject: [PATCH 2/2] fix compile --- ...dBatchResizeForShuffleInputAndOutput.scala | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala index f72f8430697b..92519eecf77d 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/extension/AppendBatchResizeForShuffleInputAndOutput.scala @@ -44,11 +44,16 @@ case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan] { val appendBatches = VeloxResizeBatchesExec(shuffle.child, range.min, range.max) shuffle.withNewChildren(Seq(appendBatches)) + case a @ AQEShuffleReadExec( + ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec, _), + _) + if resizeBatchesShuffleOutputEnabled && + shuffle.shuffleWriterType.requiresResizingShuffleOutput => + VeloxResizeBatchesExec(a, range.min, range.max) case a @ AQEShuffleReadExec( ShuffleQueryStageExec( _, - shuffle: ColumnarShuffleExchangeExec | - ReusedExchangeExec(_, _: ColumnarShuffleExchangeExec), + ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec), _), _) if resizeBatchesShuffleOutputEnabled && @@ -57,12 +62,20 @@ case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan] { // Since it's transformed in a bottom to up order, so we may first encounter // ShuffeQueryStageExec, which is transformed to VeloxResizeBatchesExec(ShuffeQueryStageExec), // then we see AQEShuffleReadExec + case a @ AQEShuffleReadExec( + VeloxResizeBatchesExec( + s @ ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec, _), + _, + _), + _) + if resizeBatchesShuffleOutputEnabled && + shuffle.shuffleWriterType.requiresResizingShuffleOutput => + VeloxResizeBatchesExec(a.copy(child = s), range.min, range.max) case a @ AQEShuffleReadExec( VeloxResizeBatchesExec( s @ ShuffleQueryStageExec( _, - shuffle: ColumnarShuffleExchangeExec | - ReusedExchangeExec(_, _: ColumnarShuffleExchangeExec), + ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec), _), _, _), @@ -70,10 +83,13 @@ case class AppendBatchResizeForShuffleInputAndOutput() extends Rule[SparkPlan] { if resizeBatchesShuffleOutputEnabled && shuffle.shuffleWriterType.requiresResizingShuffleOutput => VeloxResizeBatchesExec(a.copy(child = s), range.min, range.max) + case s @ ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec, _) + if resizeBatchesShuffleOutputEnabled && + shuffle.shuffleWriterType.requiresResizingShuffleOutput => + VeloxResizeBatchesExec(s, range.min, range.max) case s @ ShuffleQueryStageExec( _, - shuffle: ColumnarShuffleExchangeExec | - ReusedExchangeExec(_, _: ColumnarShuffleExchangeExec), + ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec), _) if resizeBatchesShuffleOutputEnabled && shuffle.shuffleWriterType.requiresResizingShuffleOutput =>