From 78534d8254c668ecde32d9188d94b48b264d507b Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 3 Apr 2025 14:51:19 +0200 Subject: [PATCH 01/14] fixup --- .gitignore | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index f402cb1e83e0..ab86334c8ebd 100644 --- a/.gitignore +++ b/.gitignore @@ -56,8 +56,7 @@ cmake_install.cmake build/ *-build/ Testing/ -cmake-build-debug/ -cmake-build-release/ +cmake-build-*/ ep/_ep/ # Editor temporary/working/backup files # From 51c422a9647f2e7d7d6abeb363849db9707ceded Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 3 Apr 2025 16:26:53 +0200 Subject: [PATCH 02/14] fixup --- .../gluten/execution/OffloadDeltaNode.scala | 2 +- .../clickhouse/CHSparkPlanExecApi.scala | 6 +- .../extension/columnar/SingleNodeOps.scala | 94 +++++++++++++++++++ .../columnar/offload/OffloadSingleNode.scala | 5 +- .../columnar/rewrite/RewriteSingleNode.scala | 8 +- .../gluten/execution/OffloadDeltaFilter.scala | 2 +- .../execution/OffloadDeltaProject.scala | 2 +- .../gluten/execution/OffloadDeltaScan.scala | 2 +- .../gluten/execution/OffloadHudiScan.scala | 2 +- .../gluten/execution/OffloadIcebergScan.scala | 2 +- .../gluten/execution/OffloadKafkaScan.scala | 2 +- .../offload/OffloadSingleNodeRules.scala | 6 +- .../rewrite/ProjectColumnPruning.scala | 4 +- .../columnar/rewrite/PullOutPostProject.scala | 4 +- .../columnar/rewrite/PullOutPreProject.scala | 4 +- .../columnar/rewrite/RewriteIn.scala | 4 +- .../columnar/rewrite/RewriteJoin.scala | 4 +- .../rewrite/RewriteMultiChildrenCount.scala | 4 +- 18 files changed, 131 insertions(+), 26 deletions(-) create mode 100644 gluten-core/src/main/scala/org/apache/gluten/extension/columnar/SingleNodeOps.scala diff --git a/backends-clickhouse/src-delta/main/scala/org/apache/gluten/execution/OffloadDeltaNode.scala b/backends-clickhouse/src-delta/main/scala/org/apache/gluten/execution/OffloadDeltaNode.scala index bfde89733dd1..252d9e858a90 100644 --- a/backends-clickhouse/src-delta/main/scala/org/apache/gluten/execution/OffloadDeltaNode.scala +++ b/backends-clickhouse/src-delta/main/scala/org/apache/gluten/execution/OffloadDeltaNode.scala @@ -22,7 +22,7 @@ import org.apache.gluten.sql.shims.DeltaShimLoader import org.apache.spark.sql.execution.SparkPlan case class OffloadDeltaNode() extends OffloadSingleNode { - override def offload(plan: SparkPlan): SparkPlan = plan match { + override def offload0(plan: SparkPlan): SparkPlan = plan match { case plan if DeltaShimLoader.getDeltaShims.supportDeltaOptimizedWriterExec(plan) => DeltaShimLoader.getDeltaShims.offloadDeltaOptimizedWriterExec(plan) case other => other diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index dcf19204d2af..88e1c8e42b3a 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -23,6 +23,7 @@ import org.apache.gluten.execution._ import org.apache.gluten.expression._ import org.apache.gluten.expression.ExpressionNames.MONOTONICALLY_INCREASING_ID import org.apache.gluten.extension.ExpressionExtensionTrait +import org.apache.gluten.extension.columnar.SingleNodeOps import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode, WindowFunctionNode} @@ -271,7 +272,10 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { } } - override def genColumnarShuffleExchange(shuffle: ShuffleExchangeExec): SparkPlan = { + override def genColumnarShuffleExchange(originalShuffle: ShuffleExchangeExec): SparkPlan = { + // CH backend needs to conditionally offload shuffle exchange based on children's information. + // So we call the restore API to get back the original plan. + val shuffle = SingleNodeOps.restoreHiddenChildren(originalShuffle) val child = shuffle.child if (CHValidatorApi.supportShuffleWithProject(shuffle.outputPartitioning, child)) { val (projectColumnNumber, newPartitioning, newChild) = diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/SingleNodeOps.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/SingleNodeOps.scala new file mode 100644 index 000000000000..24469acec214 --- /dev/null +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/SingleNodeOps.scala @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.extension.columnar + +import org.apache.gluten.execution.GlutenPlan +import org.apache.gluten.extension.columnar.transition.Convention + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.vectorized.ColumnarBatch + +object SingleNodeOps { + implicit class SparkPlanOps(plan: SparkPlan) { + + /** + * Runs a rule function where node the input query plan is interested. All children will be + * replaced with 'DummyLeafExec' nodes so they are not accessible in the rule body. + */ + def applyOnNode(func: SparkPlan => SparkPlan): SparkPlan = { + val planWithChildrenHidden = hideChildren(plan) + val applied = func(planWithChildrenHidden) + val out = restoreHiddenChildren(applied) + out + } + } + + /** + * Replaces the children with 'DummyLeafExec' nodes so they become inaccessible afterward. Used + * when the children plan nodes can be dropped because not interested. + */ + def hideChildren[T <: SparkPlan](plan: T): T = { + plan.withNewChildren(plan.children.map(child => new DummyLeafExec(child))).asInstanceOf[T] + } + + /** + * Restores hidden children from the replaced 'DummyLeafExec' nodes. It's exposed only for + * compatibility reason. Not recommended to be used in formal rule code. + */ + def restoreHiddenChildren[T <: SparkPlan](plan: T): T = { + plan + .transformDown { + case d: DummyLeafExec => + d.hiddenPlan + case other => other + } + .asInstanceOf[T] + } + + /** + * The plan node that hides the real child plan node during #applyOnNode call. This is used when + * query planner doesn't allow a rule to access the child plan nodes from the input query plan + * node. + */ + private class DummyLeafExec(val hiddenPlan: SparkPlan) + extends LeafExecNode + with Convention.KnownBatchType + with Convention.KnownRowTypeForSpark33OrLater + with GlutenPlan.SupportsRowBasedCompatible { + private val conv: Convention = Convention.get(hiddenPlan) + + override def batchType(): Convention.BatchType = conv.batchType + override def rowType0(): Convention.RowType = conv.rowType + override def output: Seq[Attribute] = hiddenPlan.output + override def outputPartitioning: Partitioning = hiddenPlan.outputPartitioning + override def outputOrdering: Seq[SortOrder] = hiddenPlan.outputOrdering + + override protected def doExecute(): RDD[InternalRow] = + throw new UnsupportedOperationException("Not allowed in #applyOnNode call") + override protected def doExecuteColumnar(): RDD[ColumnarBatch] = + throw new UnsupportedOperationException("Not allowed in #applyOnNode call") + override protected[sql] def doExecuteBroadcast[T](): Broadcast[T] = + throw new UnsupportedOperationException("Not allowed in #applyOnNode call") + + override def canEqual(that: Any): Boolean = that.isInstanceOf[DummyLeafExec] + } +} diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala index 232973f53a5d..38d9186a4fb9 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala @@ -16,6 +16,8 @@ */ package org.apache.gluten.extension.columnar.offload +import org.apache.gluten.extension.columnar.SingleNodeOps._ + import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.SparkPlan @@ -27,5 +29,6 @@ import org.apache.spark.sql.execution.SparkPlan * the children node. Tree-walking is done by caller of this trait. */ trait OffloadSingleNode extends Logging { - def offload(plan: SparkPlan): SparkPlan + final def offload(plan: SparkPlan): SparkPlan = plan.applyOnNode(offload0) + protected[OffloadSingleNode] def offload0(plan: SparkPlan): SparkPlan } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSingleNode.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSingleNode.scala index 38a8031a5cab..29a6aaff4c79 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSingleNode.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSingleNode.scala @@ -16,6 +16,8 @@ */ package org.apache.gluten.extension.columnar.rewrite +import org.apache.gluten.extension.columnar.SingleNodeOps._ + import org.apache.spark.sql.execution.SparkPlan /** @@ -30,6 +32,8 @@ import org.apache.spark.sql.execution.SparkPlan * TODO: Ideally for such API we'd better to allow multiple alternative outputs. */ trait RewriteSingleNode { - def isRewritable(plan: SparkPlan): Boolean - def rewrite(plan: SparkPlan): SparkPlan + final def isRewritable(plan: SparkPlan): Boolean = isRewritable0(hideChildren(plan)) + protected[RewriteSingleNode] def isRewritable0(plan: SparkPlan): Boolean + final def rewrite(plan: SparkPlan): SparkPlan = plan.applyOnNode(rewrite0) + protected[RewriteSingleNode] def rewrite0(plan: SparkPlan): SparkPlan } diff --git a/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaFilter.scala b/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaFilter.scala index 1298830b0333..adb630a7dc03 100644 --- a/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaFilter.scala +++ b/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaFilter.scala @@ -22,7 +22,7 @@ import org.apache.gluten.extension.columnar.offload.OffloadSingleNode import org.apache.spark.sql.execution.{FilterExec, SparkPlan} case class OffloadDeltaFilter() extends OffloadSingleNode { - override def offload(plan: SparkPlan): SparkPlan = plan match { + override def offload0(plan: SparkPlan): SparkPlan = plan match { case FilterExec(condition, child) if containsIncrementMetricExpr(condition) => DeltaFilterExecTransformer(condition, child) case p => p diff --git a/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaProject.scala b/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaProject.scala index e9317c7ce81d..60add78566a7 100644 --- a/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaProject.scala +++ b/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaProject.scala @@ -22,7 +22,7 @@ import org.apache.gluten.extension.columnar.offload.OffloadSingleNode import org.apache.spark.sql.execution.{ProjectExec, SparkPlan} case class OffloadDeltaProject() extends OffloadSingleNode { - override def offload(plan: SparkPlan): SparkPlan = plan match { + override def offload0(plan: SparkPlan): SparkPlan = plan match { case ProjectExec(projectList, child) if projectList.exists(containsIncrementMetricExpr) => DeltaProjectExecTransformer(projectList, child) case p => p diff --git a/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaScan.scala b/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaScan.scala index 0df71a631f43..6f2fb2fd0025 100644 --- a/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaScan.scala +++ b/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaScan.scala @@ -21,7 +21,7 @@ import org.apache.gluten.extension.columnar.offload.OffloadSingleNode import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} case class OffloadDeltaScan() extends OffloadSingleNode { - override def offload(plan: SparkPlan): SparkPlan = plan match { + override def offload0(plan: SparkPlan): SparkPlan = plan match { case scan: FileSourceScanExec if scan.relation.fileFormat.getClass.getName == "org.apache.spark.sql.delta.DeltaParquetFileFormat" => diff --git a/gluten-hudi/src/main/scala/org/apache/gluten/execution/OffloadHudiScan.scala b/gluten-hudi/src/main/scala/org/apache/gluten/execution/OffloadHudiScan.scala index f937dba28c4e..f3ce00b40261 100644 --- a/gluten-hudi/src/main/scala/org/apache/gluten/execution/OffloadHudiScan.scala +++ b/gluten-hudi/src/main/scala/org/apache/gluten/execution/OffloadHudiScan.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.execution.SparkPlan /** Since https://github.com/apache/incubator-gluten/pull/6049. */ case class OffloadHudiScan() extends OffloadSingleNode { - override def offload(plan: SparkPlan): SparkPlan = { + override def offload0(plan: SparkPlan): SparkPlan = { plan match { // Hudi has multiple file format definitions whose names end with "HoodieParquetFileFormat". case scan: org.apache.spark.sql.execution.FileSourceScanExec diff --git a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala index 6747b79ffc2a..22417e338252 100644 --- a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala +++ b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.v2.BatchScanExec case class OffloadIcebergScan() extends OffloadSingleNode { - override def offload(plan: SparkPlan): SparkPlan = plan match { + override def offload0(plan: SparkPlan): SparkPlan = plan match { case scan: BatchScanExec if IcebergScanTransformer.supportsBatchScan(scan.scan) => IcebergScanTransformer(scan) case other => other diff --git a/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala b/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala index 262fa82f20f9..34365e95b0ab 100644 --- a/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala +++ b/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, MicroBatchScanExec} case class OffloadKafkaScan() extends OffloadSingleNode { - override def offload(plan: SparkPlan): SparkPlan = plan match { + override def offload0(plan: SparkPlan): SparkPlan = plan match { case scan: MicroBatchScanExec if MicroBatchScanExecTransformer.supportsBatchScan(scan.scan) => MicroBatchScanExecTransformer(scan) case other => other diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala index d356a39b4103..a5be11258abd 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.hive.HiveTableScanExecTransformer // Exchange transformation. case class OffloadExchange() extends OffloadSingleNode with LogLevelUtil { - override def offload(plan: SparkPlan): SparkPlan = plan match { + override def offload0(plan: SparkPlan): SparkPlan = plan match { case p if FallbackTags.nonEmpty(p) => p case s: ShuffleExchangeExec => @@ -55,7 +55,7 @@ case class OffloadExchange() extends OffloadSingleNode with LogLevelUtil { // Join transformation. case class OffloadJoin() extends OffloadSingleNode with LogLevelUtil { - override def offload(plan: SparkPlan): SparkPlan = { + override def offload0(plan: SparkPlan): SparkPlan = { if (FallbackTags.nonEmpty(plan)) { logDebug(s"Columnar Processing for ${plan.getClass} is under row guard.") return plan @@ -177,7 +177,7 @@ case class OffloadOthers() extends OffloadSingleNode with LogLevelUtil { import OffloadOthers._ private val replace = new ReplaceSingleNode - override def offload(plan: SparkPlan): SparkPlan = replace.doReplace(plan) + override def offload0(plan: SparkPlan): SparkPlan = replace.doReplace(plan) } object OffloadOthers { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/ProjectColumnPruning.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/ProjectColumnPruning.scala index 050b9c78da49..a22d6972b28a 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/ProjectColumnPruning.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/ProjectColumnPruning.scala @@ -23,11 +23,11 @@ import org.apache.spark.sql.execution.{ProjectExec, SparkPlan, UnaryExecNode} * consumed by the parent. These columns will be removed by this rewrite rule. */ object ProjectColumnPruning extends RewriteSingleNode { - override def isRewritable(plan: SparkPlan): Boolean = { + override def isRewritable0(plan: SparkPlan): Boolean = { RewriteEligibility.isRewritable(plan) } - override def rewrite(plan: SparkPlan): SparkPlan = plan match { + override def rewrite0(plan: SparkPlan): SparkPlan = plan match { case parent: UnaryExecNode if parent.child.isInstanceOf[ProjectExec] => val project = parent.child.asInstanceOf[ProjectExec] val unusedAttribute = project.outputSet -- (parent.references ++ parent.outputSet) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPostProject.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPostProject.scala index 1a3ecca16a00..d02ebe05d59e 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPostProject.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPostProject.scala @@ -33,7 +33,7 @@ import scala.collection.mutable.ArrayBuffer * when a fallback occurs. */ object PullOutPostProject extends RewriteSingleNode with PullOutProjectHelper { - override def isRewritable(plan: SparkPlan): Boolean = { + override def isRewritable0(plan: SparkPlan): Boolean = { RewriteEligibility.isRewritable(plan) } @@ -74,7 +74,7 @@ object PullOutPostProject extends RewriteSingleNode with PullOutProjectHelper { } } - override def rewrite(plan: SparkPlan): SparkPlan = plan match { + override def rewrite0(plan: SparkPlan): SparkPlan = plan match { case agg: BaseAggregateExec if supportedAggregate(agg) && needsPostProjection(agg) => val pullOutHelper = BackendsApiManager.getSparkPlanExecApiInstance.genHashAggregateExecPullOutHelper( diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala index 7f32014c2478..8044a2dbaa3b 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala @@ -37,7 +37,7 @@ import scala.collection.mutable * execution by the native engine. */ object PullOutPreProject extends RewriteSingleNode with PullOutProjectHelper { - override def isRewritable(plan: SparkPlan): Boolean = { + override def isRewritable0(plan: SparkPlan): Boolean = { RewriteEligibility.isRewritable(plan) } @@ -122,7 +122,7 @@ object PullOutPreProject extends RewriteSingleNode with PullOutProjectHelper { } } - override def rewrite(plan: SparkPlan): SparkPlan = plan match { + override def rewrite0(plan: SparkPlan): SparkPlan = plan match { case sort: SortExec if needsPreProject(sort) => val expressionMap = new mutable.HashMap[Expression, NamedExpression]() val newSortOrder = getNewSortOrder(sort.sortOrder, expressionMap) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteIn.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteIn.scala index 5a28576750ab..2c8255bb9269 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteIn.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteIn.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.types.StructType * TODO: Remove this rule once Velox support the list option in `In` is not literal. */ object RewriteIn extends RewriteSingleNode { - override def isRewritable(plan: SparkPlan): Boolean = { + override def isRewritable0(plan: SparkPlan): Boolean = { RewriteEligibility.isRewritable(plan) } @@ -60,7 +60,7 @@ object RewriteIn extends RewriteSingleNode { } } - override def rewrite(plan: SparkPlan): SparkPlan = { + override def rewrite0(plan: SparkPlan): SparkPlan = { plan match { // TODO: Support datasource v2 case scan: FileSourceScanExec if scan.dataFilters.exists(_.find(shouldRewrite).isDefined) => diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala index 42bbbba39d02..592c017f00da 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoin /** If force ShuffledHashJoin, convert [[SortMergeJoinExec]] to [[ShuffledHashJoinExec]]. */ object RewriteJoin extends RewriteSingleNode with JoinSelectionHelper { - override def isRewritable(plan: SparkPlan): Boolean = { + override def isRewritable0(plan: SparkPlan): Boolean = { RewriteEligibility.isRewritable(plan) } @@ -55,7 +55,7 @@ object RewriteJoin extends RewriteSingleNode with JoinSelectionHelper { Some(side) } - override def rewrite(plan: SparkPlan): SparkPlan = plan match { + override def rewrite0(plan: SparkPlan): SparkPlan = plan match { case smj: SortMergeJoinExec if GlutenConfig.get.forceShuffledHashJoin => getSmjBuildSide(smj) match { case Some(buildSide) => diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteMultiChildrenCount.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteMultiChildrenCount.scala index 1d11dcc91795..6606548dfb16 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteMultiChildrenCount.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteMultiChildrenCount.scala @@ -48,7 +48,7 @@ import org.apache.spark.sql.types.IntegerType object RewriteMultiChildrenCount extends RewriteSingleNode with PullOutProjectHelper { private lazy val shouldRewriteCount = BackendsApiManager.getSettings.shouldRewriteCount() - override def isRewritable(plan: SparkPlan): Boolean = { + override def isRewritable0(plan: SparkPlan): Boolean = { RewriteEligibility.isRewritable(plan) } @@ -95,7 +95,7 @@ object RewriteMultiChildrenCount extends RewriteSingleNode with PullOutProjectHe } } - override def rewrite(plan: SparkPlan): SparkPlan = { + override def rewrite0(plan: SparkPlan): SparkPlan = { if (!shouldRewriteCount) { return plan } From 5d338adcc7923e6296d2c604607ef5dd900f9f97 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 3 Apr 2025 17:49:17 +0200 Subject: [PATCH 03/14] fixup --- .../gluten/extension/columnar/SingleNodeOps.scala | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/SingleNodeOps.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/SingleNodeOps.scala index 24469acec214..0f495980817d 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/SingleNodeOps.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/SingleNodeOps.scala @@ -18,7 +18,6 @@ package org.apache.gluten.extension.columnar import org.apache.gluten.execution.GlutenPlan import org.apache.gluten.extension.columnar.transition.Convention - import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -69,11 +68,9 @@ object SingleNodeOps { * query planner doesn't allow a rule to access the child plan nodes from the input query plan * node. */ - private class DummyLeafExec(val hiddenPlan: SparkPlan) + private case class DummyLeafExec(hiddenPlan: SparkPlan) extends LeafExecNode - with Convention.KnownBatchType - with Convention.KnownRowTypeForSpark33OrLater - with GlutenPlan.SupportsRowBasedCompatible { + with GlutenPlan { private val conv: Convention = Convention.get(hiddenPlan) override def batchType(): Convention.BatchType = conv.batchType @@ -82,13 +79,11 @@ object SingleNodeOps { override def outputPartitioning: Partitioning = hiddenPlan.outputPartitioning override def outputOrdering: Seq[SortOrder] = hiddenPlan.outputOrdering - override protected def doExecute(): RDD[InternalRow] = + override def doExecute(): RDD[InternalRow] = throw new UnsupportedOperationException("Not allowed in #applyOnNode call") - override protected def doExecuteColumnar(): RDD[ColumnarBatch] = + override def doExecuteColumnar(): RDD[ColumnarBatch] = throw new UnsupportedOperationException("Not allowed in #applyOnNode call") - override protected[sql] def doExecuteBroadcast[T](): Broadcast[T] = + override def doExecuteBroadcast[T](): Broadcast[T] = throw new UnsupportedOperationException("Not allowed in #applyOnNode call") - - override def canEqual(that: Any): Boolean = that.isInstanceOf[DummyLeafExec] } } From 7b923804aa4ca78912e69a5b2d656af69d89164d Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 3 Apr 2025 17:52:25 +0200 Subject: [PATCH 04/14] fixup --- .../org/apache/gluten/extension/columnar/SingleNodeOps.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/SingleNodeOps.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/SingleNodeOps.scala index 0f495980817d..9e6a97ddd751 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/SingleNodeOps.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/SingleNodeOps.scala @@ -71,7 +71,7 @@ object SingleNodeOps { private case class DummyLeafExec(hiddenPlan: SparkPlan) extends LeafExecNode with GlutenPlan { - private val conv: Convention = Convention.get(hiddenPlan) + private lazy val conv: Convention = Convention.get(hiddenPlan) override def batchType(): Convention.BatchType = conv.batchType override def rowType0(): Convention.RowType = conv.rowType From b6c2358083c53b0c27a2ab51f47561146fb974ca Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 3 Apr 2025 17:57:04 +0200 Subject: [PATCH 05/14] fixup --- .../extension/columnar/offload/OffloadSingleNodeRules.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala index a5be11258abd..bedecf1de374 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala @@ -184,7 +184,7 @@ object OffloadOthers { // Utility to replace single node within transformed Gluten node. // Children will be preserved as they are as children of the output node. // - // Do not look up on children on the input node in this rule. Otherwise + // Do not look up on children on the input node in this rule. Otherwise, // it may break RAS which would group all the possible input nodes to // search for validate candidates. private class ReplaceSingleNode extends LogLevelUtil with Logging { From aa507d6cc2ab830ad2e6ff02fe4eab6f2e56f43f Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 3 Apr 2025 18:11:24 +0200 Subject: [PATCH 06/14] fixup --- .../org/apache/gluten/extension/columnar/SingleNodeOps.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/SingleNodeOps.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/SingleNodeOps.scala index 9e6a97ddd751..f1d4084b0459 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/SingleNodeOps.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/SingleNodeOps.scala @@ -18,6 +18,7 @@ package org.apache.gluten.extension.columnar import org.apache.gluten.execution.GlutenPlan import org.apache.gluten.extension.columnar.transition.Convention + import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow @@ -68,9 +69,7 @@ object SingleNodeOps { * query planner doesn't allow a rule to access the child plan nodes from the input query plan * node. */ - private case class DummyLeafExec(hiddenPlan: SparkPlan) - extends LeafExecNode - with GlutenPlan { + private case class DummyLeafExec(hiddenPlan: SparkPlan) extends LeafExecNode with GlutenPlan { private lazy val conv: Convention = Convention.get(hiddenPlan) override def batchType(): Convention.BatchType = conv.batchType From 86f2244a3a051dea1d2cf34bafc180846a0665d8 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 3 Apr 2025 20:30:09 +0200 Subject: [PATCH 07/14] fixup --- .../gluten/execution/OffloadDeltaNode.scala | 2 +- .../clickhouse/CHSparkPlanExecApi.scala | 6 +- .../backendsapi/velox/VeloxRuleApi.scala | 2 +- .../extension/columnar/SingleNodeOps.scala | 88 ------------------- .../columnar/offload/OffloadSingleNode.scala | 82 ++++++++++++++++- .../columnar/rewrite/RewriteSingleNode.scala | 8 +- .../gluten/execution/OffloadDeltaFilter.scala | 2 +- .../execution/OffloadDeltaProject.scala | 2 +- .../gluten/execution/OffloadDeltaScan.scala | 2 +- .../gluten/execution/OffloadHudiScan.scala | 2 +- .../gluten/execution/OffloadIcebergScan.scala | 2 +- .../gluten/execution/OffloadKafkaScan.scala | 2 +- .../offload/OffloadSingleNodeRules.scala | 6 +- .../rewrite/ProjectColumnPruning.scala | 4 +- .../columnar/rewrite/PullOutPostProject.scala | 4 +- .../columnar/rewrite/PullOutPreProject.scala | 4 +- .../columnar/rewrite/RewriteIn.scala | 4 +- .../columnar/rewrite/RewriteJoin.scala | 4 +- .../rewrite/RewriteMultiChildrenCount.scala | 4 +- 19 files changed, 104 insertions(+), 126 deletions(-) delete mode 100644 gluten-core/src/main/scala/org/apache/gluten/extension/columnar/SingleNodeOps.scala diff --git a/backends-clickhouse/src-delta/main/scala/org/apache/gluten/execution/OffloadDeltaNode.scala b/backends-clickhouse/src-delta/main/scala/org/apache/gluten/execution/OffloadDeltaNode.scala index 252d9e858a90..bfde89733dd1 100644 --- a/backends-clickhouse/src-delta/main/scala/org/apache/gluten/execution/OffloadDeltaNode.scala +++ b/backends-clickhouse/src-delta/main/scala/org/apache/gluten/execution/OffloadDeltaNode.scala @@ -22,7 +22,7 @@ import org.apache.gluten.sql.shims.DeltaShimLoader import org.apache.spark.sql.execution.SparkPlan case class OffloadDeltaNode() extends OffloadSingleNode { - override def offload0(plan: SparkPlan): SparkPlan = plan match { + override def offload(plan: SparkPlan): SparkPlan = plan match { case plan if DeltaShimLoader.getDeltaShims.supportDeltaOptimizedWriterExec(plan) => DeltaShimLoader.getDeltaShims.offloadDeltaOptimizedWriterExec(plan) case other => other diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala index 88e1c8e42b3a..dcf19204d2af 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHSparkPlanExecApi.scala @@ -23,7 +23,6 @@ import org.apache.gluten.execution._ import org.apache.gluten.expression._ import org.apache.gluten.expression.ExpressionNames.MONOTONICALLY_INCREASING_ID import org.apache.gluten.extension.ExpressionExtensionTrait -import org.apache.gluten.extension.columnar.SingleNodeOps import org.apache.gluten.extension.columnar.heuristic.HeuristicTransform import org.apache.gluten.sql.shims.SparkShimLoader import org.apache.gluten.substrait.expression.{ExpressionBuilder, ExpressionNode, WindowFunctionNode} @@ -272,10 +271,7 @@ class CHSparkPlanExecApi extends SparkPlanExecApi with Logging { } } - override def genColumnarShuffleExchange(originalShuffle: ShuffleExchangeExec): SparkPlan = { - // CH backend needs to conditionally offload shuffle exchange based on children's information. - // So we call the restore API to get back the original plan. - val shuffle = SingleNodeOps.restoreHiddenChildren(originalShuffle) + override def genColumnarShuffleExchange(shuffle: ShuffleExchangeExec): SparkPlan = { val child = shuffle.child if (CHValidatorApi.supportShuffleWithProject(shuffle.outputPartitioning, child)) { val (projectColumnNumber, newPartitioning, newChild) = diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index 82a20b8cc56c..04dfbf255898 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -74,7 +74,7 @@ object VeloxRuleApi { injector.injectPreTransform(c => ArrowScanReplaceRule.apply(c.session)) // Legacy: The legacy transform rule. - val offloads = Seq(OffloadOthers(), OffloadExchange(), OffloadJoin()) + val offloads = Seq(OffloadOthers(), OffloadExchange(), OffloadJoin()).map(_.toStrcitRule()) val validatorBuilder: GlutenConfig => Validator = conf => Validators.newValidator(conf, offloads) val rewrites = diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/SingleNodeOps.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/SingleNodeOps.scala deleted file mode 100644 index f1d4084b0459..000000000000 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/SingleNodeOps.scala +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gluten.extension.columnar - -import org.apache.gluten.execution.GlutenPlan -import org.apache.gluten.extension.columnar.transition.Convention - -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} -import org.apache.spark.sql.catalyst.plans.physical.Partitioning -import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} -import org.apache.spark.sql.vectorized.ColumnarBatch - -object SingleNodeOps { - implicit class SparkPlanOps(plan: SparkPlan) { - - /** - * Runs a rule function where node the input query plan is interested. All children will be - * replaced with 'DummyLeafExec' nodes so they are not accessible in the rule body. - */ - def applyOnNode(func: SparkPlan => SparkPlan): SparkPlan = { - val planWithChildrenHidden = hideChildren(plan) - val applied = func(planWithChildrenHidden) - val out = restoreHiddenChildren(applied) - out - } - } - - /** - * Replaces the children with 'DummyLeafExec' nodes so they become inaccessible afterward. Used - * when the children plan nodes can be dropped because not interested. - */ - def hideChildren[T <: SparkPlan](plan: T): T = { - plan.withNewChildren(plan.children.map(child => new DummyLeafExec(child))).asInstanceOf[T] - } - - /** - * Restores hidden children from the replaced 'DummyLeafExec' nodes. It's exposed only for - * compatibility reason. Not recommended to be used in formal rule code. - */ - def restoreHiddenChildren[T <: SparkPlan](plan: T): T = { - plan - .transformDown { - case d: DummyLeafExec => - d.hiddenPlan - case other => other - } - .asInstanceOf[T] - } - - /** - * The plan node that hides the real child plan node during #applyOnNode call. This is used when - * query planner doesn't allow a rule to access the child plan nodes from the input query plan - * node. - */ - private case class DummyLeafExec(hiddenPlan: SparkPlan) extends LeafExecNode with GlutenPlan { - private lazy val conv: Convention = Convention.get(hiddenPlan) - - override def batchType(): Convention.BatchType = conv.batchType - override def rowType0(): Convention.RowType = conv.rowType - override def output: Seq[Attribute] = hiddenPlan.output - override def outputPartitioning: Partitioning = hiddenPlan.outputPartitioning - override def outputOrdering: Seq[SortOrder] = hiddenPlan.outputOrdering - - override def doExecute(): RDD[InternalRow] = - throw new UnsupportedOperationException("Not allowed in #applyOnNode call") - override def doExecuteColumnar(): RDD[ColumnarBatch] = - throw new UnsupportedOperationException("Not allowed in #applyOnNode call") - override def doExecuteBroadcast[T](): Broadcast[T] = - throw new UnsupportedOperationException("Not allowed in #applyOnNode call") - } -} diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala index 38d9186a4fb9..58586caf8816 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala @@ -16,10 +16,17 @@ */ package org.apache.gluten.extension.columnar.offload -import org.apache.gluten.extension.columnar.SingleNodeOps._ +import org.apache.gluten.execution.GlutenPlan +import org.apache.gluten.extension.columnar.transition.Convention +import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.vectorized.ColumnarBatch /** * Converts a vanilla Spark plan node into Gluten plan node. Gluten plan is supposed to be executed @@ -29,6 +36,73 @@ import org.apache.spark.sql.execution.SparkPlan * the children node. Tree-walking is done by caller of this trait. */ trait OffloadSingleNode extends Logging { - final def offload(plan: SparkPlan): SparkPlan = plan.applyOnNode(offload0) - protected[OffloadSingleNode] def offload0(plan: SparkPlan): SparkPlan + def offload(plan: SparkPlan): SparkPlan +} + +object OffloadSingleNode { + implicit class OffloadSingleNodeOps(rule: OffloadSingleNode) { + + /** + * Converts the [[OffloadSingleNode]] rule to strict version. + * + * In the strict version rule, all children of the input query plan node will be replaced with + * 'DummyLeafExec' nodes so they are not accessible in the rule body. + */ + def toStrcitRule(): OffloadSingleNode = { + new StrictRule(rule); + } + } + + private class StrictRule(delegate: OffloadSingleNode) extends OffloadSingleNode { + override def offload(plan: SparkPlan): SparkPlan = { + val planWithChildrenHidden = hideChildren(plan) + val applied = delegate.offload(planWithChildrenHidden) + val out = restoreHiddenChildren(applied) + out + } + + /** + * Replaces the children with 'DummyLeafExec' nodes so they become inaccessible afterward. Used + * when the children plan nodes can be dropped because not interested. + */ + private def hideChildren[T <: SparkPlan](plan: T): T = { + plan.withNewChildren(plan.children.map(child => new DummyLeafExec(child))).asInstanceOf[T] + } + + /** + * Restores hidden children from the replaced 'DummyLeafExec' nodes. It's exposed only for + * compatibility reason. Not recommended to be used in formal rule code. + */ + private def restoreHiddenChildren[T <: SparkPlan](plan: T): T = { + plan + .transformDown { + case d: DummyLeafExec => + d.hiddenPlan + case other => other + } + .asInstanceOf[T] + } + } + + /** + * The plan node that hides the real child plan node during #applyOnNode call. This is used when + * query planner doesn't allow a rule to access the child plan nodes from the input query plan + * node. + */ + private case class DummyLeafExec(hiddenPlan: SparkPlan) extends LeafExecNode with GlutenPlan { + private lazy val conv: Convention = Convention.get(hiddenPlan) + + override def batchType(): Convention.BatchType = conv.batchType + override def rowType0(): Convention.RowType = conv.rowType + override def output: Seq[Attribute] = hiddenPlan.output + override def outputPartitioning: Partitioning = hiddenPlan.outputPartitioning + override def outputOrdering: Seq[SortOrder] = hiddenPlan.outputOrdering + + override def doExecute(): RDD[InternalRow] = + throw new UnsupportedOperationException("Not allowed in #applyOnNode call") + override def doExecuteColumnar(): RDD[ColumnarBatch] = + throw new UnsupportedOperationException("Not allowed in #applyOnNode call") + override def doExecuteBroadcast[T](): Broadcast[T] = + throw new UnsupportedOperationException("Not allowed in #applyOnNode call") + } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSingleNode.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSingleNode.scala index 29a6aaff4c79..38a8031a5cab 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSingleNode.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSingleNode.scala @@ -16,8 +16,6 @@ */ package org.apache.gluten.extension.columnar.rewrite -import org.apache.gluten.extension.columnar.SingleNodeOps._ - import org.apache.spark.sql.execution.SparkPlan /** @@ -32,8 +30,6 @@ import org.apache.spark.sql.execution.SparkPlan * TODO: Ideally for such API we'd better to allow multiple alternative outputs. */ trait RewriteSingleNode { - final def isRewritable(plan: SparkPlan): Boolean = isRewritable0(hideChildren(plan)) - protected[RewriteSingleNode] def isRewritable0(plan: SparkPlan): Boolean - final def rewrite(plan: SparkPlan): SparkPlan = plan.applyOnNode(rewrite0) - protected[RewriteSingleNode] def rewrite0(plan: SparkPlan): SparkPlan + def isRewritable(plan: SparkPlan): Boolean + def rewrite(plan: SparkPlan): SparkPlan } diff --git a/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaFilter.scala b/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaFilter.scala index adb630a7dc03..1298830b0333 100644 --- a/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaFilter.scala +++ b/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaFilter.scala @@ -22,7 +22,7 @@ import org.apache.gluten.extension.columnar.offload.OffloadSingleNode import org.apache.spark.sql.execution.{FilterExec, SparkPlan} case class OffloadDeltaFilter() extends OffloadSingleNode { - override def offload0(plan: SparkPlan): SparkPlan = plan match { + override def offload(plan: SparkPlan): SparkPlan = plan match { case FilterExec(condition, child) if containsIncrementMetricExpr(condition) => DeltaFilterExecTransformer(condition, child) case p => p diff --git a/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaProject.scala b/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaProject.scala index 60add78566a7..e9317c7ce81d 100644 --- a/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaProject.scala +++ b/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaProject.scala @@ -22,7 +22,7 @@ import org.apache.gluten.extension.columnar.offload.OffloadSingleNode import org.apache.spark.sql.execution.{ProjectExec, SparkPlan} case class OffloadDeltaProject() extends OffloadSingleNode { - override def offload0(plan: SparkPlan): SparkPlan = plan match { + override def offload(plan: SparkPlan): SparkPlan = plan match { case ProjectExec(projectList, child) if projectList.exists(containsIncrementMetricExpr) => DeltaProjectExecTransformer(projectList, child) case p => p diff --git a/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaScan.scala b/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaScan.scala index 6f2fb2fd0025..0df71a631f43 100644 --- a/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaScan.scala +++ b/gluten-delta/src/main/scala/org/apache/gluten/execution/OffloadDeltaScan.scala @@ -21,7 +21,7 @@ import org.apache.gluten.extension.columnar.offload.OffloadSingleNode import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan} case class OffloadDeltaScan() extends OffloadSingleNode { - override def offload0(plan: SparkPlan): SparkPlan = plan match { + override def offload(plan: SparkPlan): SparkPlan = plan match { case scan: FileSourceScanExec if scan.relation.fileFormat.getClass.getName == "org.apache.spark.sql.delta.DeltaParquetFileFormat" => diff --git a/gluten-hudi/src/main/scala/org/apache/gluten/execution/OffloadHudiScan.scala b/gluten-hudi/src/main/scala/org/apache/gluten/execution/OffloadHudiScan.scala index f3ce00b40261..f937dba28c4e 100644 --- a/gluten-hudi/src/main/scala/org/apache/gluten/execution/OffloadHudiScan.scala +++ b/gluten-hudi/src/main/scala/org/apache/gluten/execution/OffloadHudiScan.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.execution.SparkPlan /** Since https://github.com/apache/incubator-gluten/pull/6049. */ case class OffloadHudiScan() extends OffloadSingleNode { - override def offload0(plan: SparkPlan): SparkPlan = { + override def offload(plan: SparkPlan): SparkPlan = { plan match { // Hudi has multiple file format definitions whose names end with "HoodieParquetFileFormat". case scan: org.apache.spark.sql.execution.FileSourceScanExec diff --git a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala index 22417e338252..6747b79ffc2a 100644 --- a/gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala +++ b/gluten-iceberg/src/main/scala/org/apache/gluten/execution/OffloadIcebergScan.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.v2.BatchScanExec case class OffloadIcebergScan() extends OffloadSingleNode { - override def offload0(plan: SparkPlan): SparkPlan = plan match { + override def offload(plan: SparkPlan): SparkPlan = plan match { case scan: BatchScanExec if IcebergScanTransformer.supportsBatchScan(scan.scan) => IcebergScanTransformer(scan) case other => other diff --git a/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala b/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala index 34365e95b0ab..262fa82f20f9 100644 --- a/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala +++ b/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.v2.{BatchScanExec, MicroBatchScanExec} case class OffloadKafkaScan() extends OffloadSingleNode { - override def offload0(plan: SparkPlan): SparkPlan = plan match { + override def offload(plan: SparkPlan): SparkPlan = plan match { case scan: MicroBatchScanExec if MicroBatchScanExecTransformer.supportsBatchScan(scan.scan) => MicroBatchScanExecTransformer(scan) case other => other diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala index bedecf1de374..611b2a8ff71f 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala @@ -39,7 +39,7 @@ import org.apache.spark.sql.hive.HiveTableScanExecTransformer // Exchange transformation. case class OffloadExchange() extends OffloadSingleNode with LogLevelUtil { - override def offload0(plan: SparkPlan): SparkPlan = plan match { + override def offload(plan: SparkPlan): SparkPlan = plan match { case p if FallbackTags.nonEmpty(p) => p case s: ShuffleExchangeExec => @@ -55,7 +55,7 @@ case class OffloadExchange() extends OffloadSingleNode with LogLevelUtil { // Join transformation. case class OffloadJoin() extends OffloadSingleNode with LogLevelUtil { - override def offload0(plan: SparkPlan): SparkPlan = { + override def offload(plan: SparkPlan): SparkPlan = { if (FallbackTags.nonEmpty(plan)) { logDebug(s"Columnar Processing for ${plan.getClass} is under row guard.") return plan @@ -177,7 +177,7 @@ case class OffloadOthers() extends OffloadSingleNode with LogLevelUtil { import OffloadOthers._ private val replace = new ReplaceSingleNode - override def offload0(plan: SparkPlan): SparkPlan = replace.doReplace(plan) + override def offload(plan: SparkPlan): SparkPlan = replace.doReplace(plan) } object OffloadOthers { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/ProjectColumnPruning.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/ProjectColumnPruning.scala index a22d6972b28a..050b9c78da49 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/ProjectColumnPruning.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/ProjectColumnPruning.scala @@ -23,11 +23,11 @@ import org.apache.spark.sql.execution.{ProjectExec, SparkPlan, UnaryExecNode} * consumed by the parent. These columns will be removed by this rewrite rule. */ object ProjectColumnPruning extends RewriteSingleNode { - override def isRewritable0(plan: SparkPlan): Boolean = { + override def isRewritable(plan: SparkPlan): Boolean = { RewriteEligibility.isRewritable(plan) } - override def rewrite0(plan: SparkPlan): SparkPlan = plan match { + override def rewrite(plan: SparkPlan): SparkPlan = plan match { case parent: UnaryExecNode if parent.child.isInstanceOf[ProjectExec] => val project = parent.child.asInstanceOf[ProjectExec] val unusedAttribute = project.outputSet -- (parent.references ++ parent.outputSet) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPostProject.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPostProject.scala index d02ebe05d59e..1a3ecca16a00 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPostProject.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPostProject.scala @@ -33,7 +33,7 @@ import scala.collection.mutable.ArrayBuffer * when a fallback occurs. */ object PullOutPostProject extends RewriteSingleNode with PullOutProjectHelper { - override def isRewritable0(plan: SparkPlan): Boolean = { + override def isRewritable(plan: SparkPlan): Boolean = { RewriteEligibility.isRewritable(plan) } @@ -74,7 +74,7 @@ object PullOutPostProject extends RewriteSingleNode with PullOutProjectHelper { } } - override def rewrite0(plan: SparkPlan): SparkPlan = plan match { + override def rewrite(plan: SparkPlan): SparkPlan = plan match { case agg: BaseAggregateExec if supportedAggregate(agg) && needsPostProjection(agg) => val pullOutHelper = BackendsApiManager.getSparkPlanExecApiInstance.genHashAggregateExecPullOutHelper( diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala index 8044a2dbaa3b..7f32014c2478 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala @@ -37,7 +37,7 @@ import scala.collection.mutable * execution by the native engine. */ object PullOutPreProject extends RewriteSingleNode with PullOutProjectHelper { - override def isRewritable0(plan: SparkPlan): Boolean = { + override def isRewritable(plan: SparkPlan): Boolean = { RewriteEligibility.isRewritable(plan) } @@ -122,7 +122,7 @@ object PullOutPreProject extends RewriteSingleNode with PullOutProjectHelper { } } - override def rewrite0(plan: SparkPlan): SparkPlan = plan match { + override def rewrite(plan: SparkPlan): SparkPlan = plan match { case sort: SortExec if needsPreProject(sort) => val expressionMap = new mutable.HashMap[Expression, NamedExpression]() val newSortOrder = getNewSortOrder(sort.sortOrder, expressionMap) diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteIn.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteIn.scala index 2c8255bb9269..5a28576750ab 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteIn.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteIn.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.types.StructType * TODO: Remove this rule once Velox support the list option in `In` is not literal. */ object RewriteIn extends RewriteSingleNode { - override def isRewritable0(plan: SparkPlan): Boolean = { + override def isRewritable(plan: SparkPlan): Boolean = { RewriteEligibility.isRewritable(plan) } @@ -60,7 +60,7 @@ object RewriteIn extends RewriteSingleNode { } } - override def rewrite0(plan: SparkPlan): SparkPlan = { + override def rewrite(plan: SparkPlan): SparkPlan = { plan match { // TODO: Support datasource v2 case scan: FileSourceScanExec if scan.dataFilters.exists(_.find(shouldRewrite).isDefined) => diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala index 592c017f00da..42bbbba39d02 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteJoin.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.execution.joins.{ShuffledHashJoinExec, SortMergeJoin /** If force ShuffledHashJoin, convert [[SortMergeJoinExec]] to [[ShuffledHashJoinExec]]. */ object RewriteJoin extends RewriteSingleNode with JoinSelectionHelper { - override def isRewritable0(plan: SparkPlan): Boolean = { + override def isRewritable(plan: SparkPlan): Boolean = { RewriteEligibility.isRewritable(plan) } @@ -55,7 +55,7 @@ object RewriteJoin extends RewriteSingleNode with JoinSelectionHelper { Some(side) } - override def rewrite0(plan: SparkPlan): SparkPlan = plan match { + override def rewrite(plan: SparkPlan): SparkPlan = plan match { case smj: SortMergeJoinExec if GlutenConfig.get.forceShuffledHashJoin => getSmjBuildSide(smj) match { case Some(buildSide) => diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteMultiChildrenCount.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteMultiChildrenCount.scala index 6606548dfb16..1d11dcc91795 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteMultiChildrenCount.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteMultiChildrenCount.scala @@ -48,7 +48,7 @@ import org.apache.spark.sql.types.IntegerType object RewriteMultiChildrenCount extends RewriteSingleNode with PullOutProjectHelper { private lazy val shouldRewriteCount = BackendsApiManager.getSettings.shouldRewriteCount() - override def isRewritable0(plan: SparkPlan): Boolean = { + override def isRewritable(plan: SparkPlan): Boolean = { RewriteEligibility.isRewritable(plan) } @@ -95,7 +95,7 @@ object RewriteMultiChildrenCount extends RewriteSingleNode with PullOutProjectHe } } - override def rewrite0(plan: SparkPlan): SparkPlan = { + override def rewrite(plan: SparkPlan): SparkPlan = { if (!shouldRewriteCount) { return plan } From d5abed3bd284bf42ec7fc79da37a26ba39173e94 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 3 Apr 2025 20:32:33 +0200 Subject: [PATCH 08/14] fixup --- .../extension/columnar/offload/OffloadSingleNode.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala index 58586caf8816..14aaaa8ab1d2 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala @@ -43,10 +43,10 @@ object OffloadSingleNode { implicit class OffloadSingleNodeOps(rule: OffloadSingleNode) { /** - * Converts the [[OffloadSingleNode]] rule to strict version. + * Converts the [[OffloadSingleNode]] rule to a strict version. * - * In the strict version rule, all children of the input query plan node will be replaced with - * 'DummyLeafExec' nodes so they are not accessible in the rule body. + * In the strict version of the rule, all children of the input query plan node will be replaced with + * 'DummyLeafExec' nodes so they are not accessible from the rule body. */ def toStrcitRule(): OffloadSingleNode = { new StrictRule(rule); @@ -70,8 +70,7 @@ object OffloadSingleNode { } /** - * Restores hidden children from the replaced 'DummyLeafExec' nodes. It's exposed only for - * compatibility reason. Not recommended to be used in formal rule code. + * Restores hidden children from the replaced 'DummyLeafExec' nodes. */ private def restoreHiddenChildren[T <: SparkPlan](plan: T): T = { plan From a322714ffef1733e42d8e8face9ee8e49af7ac5d Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Thu, 3 Apr 2025 20:32:59 +0200 Subject: [PATCH 09/14] fixup --- .../extension/columnar/offload/OffloadSingleNode.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala index 14aaaa8ab1d2..5380f4cd6bf2 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala @@ -45,8 +45,8 @@ object OffloadSingleNode { /** * Converts the [[OffloadSingleNode]] rule to a strict version. * - * In the strict version of the rule, all children of the input query plan node will be replaced with - * 'DummyLeafExec' nodes so they are not accessible from the rule body. + * In the strict version of the rule, all children of the input query plan node will be replaced + * with 'DummyLeafExec' nodes so they are not accessible from the rule body. */ def toStrcitRule(): OffloadSingleNode = { new StrictRule(rule); @@ -69,9 +69,7 @@ object OffloadSingleNode { plan.withNewChildren(plan.children.map(child => new DummyLeafExec(child))).asInstanceOf[T] } - /** - * Restores hidden children from the replaced 'DummyLeafExec' nodes. - */ + /** Restores hidden children from the replaced 'DummyLeafExec' nodes. */ private def restoreHiddenChildren[T <: SparkPlan](plan: T): T = { plan .transformDown { From fbf8608905734e8c46d3d817bf495b43c2e8d7db Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 4 Apr 2025 11:01:23 +0200 Subject: [PATCH 10/14] fixup --- .../extension/columnar/offload/OffloadSingleNode.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala index 5380f4cd6bf2..ec49bc4da1af 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala @@ -66,7 +66,13 @@ object OffloadSingleNode { * when the children plan nodes can be dropped because not interested. */ private def hideChildren[T <: SparkPlan](plan: T): T = { - plan.withNewChildren(plan.children.map(child => new DummyLeafExec(child))).asInstanceOf[T] + plan.withNewChildren( + plan.children.map { child => + val dummyLeaf = DummyLeafExec(child) + child.logicalLink.foreach(dummyLeaf.setLogicalLink) + dummyLeaf + } + ).asInstanceOf[T] } /** Restores hidden children from the replaced 'DummyLeafExec' nodes. */ From 428109c7710cf4e65f7ebc853993a6d40708706c Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 4 Apr 2025 11:09:57 +0200 Subject: [PATCH 11/14] fixup --- .../columnar/offload/OffloadSingleNode.scala | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala index ec49bc4da1af..4cf3e1d0c95d 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala @@ -66,13 +66,16 @@ object OffloadSingleNode { * when the children plan nodes can be dropped because not interested. */ private def hideChildren[T <: SparkPlan](plan: T): T = { - plan.withNewChildren( - plan.children.map { child => - val dummyLeaf = DummyLeafExec(child) - child.logicalLink.foreach(dummyLeaf.setLogicalLink) - dummyLeaf - } - ).asInstanceOf[T] + plan + .withNewChildren( + plan.children.map { + child => + val dummyLeaf = DummyLeafExec(child) + child.logicalLink.foreach(dummyLeaf.setLogicalLink) + dummyLeaf + } + ) + .asInstanceOf[T] } /** Restores hidden children from the replaced 'DummyLeafExec' nodes. */ From 49a366fcc3e5d726dda1e333aa24b1d4e09e5457 Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 4 Apr 2025 11:24:27 +0200 Subject: [PATCH 12/14] fixup --- .../scala/org/apache/gluten/component/VeloxDeltaComponent.scala | 1 + .../scala/org/apache/gluten/execution/OffloadKafkaScan.scala | 1 + 2 files changed, 2 insertions(+) diff --git a/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala index 4e5ec88c00ac..d157d7b72532 100644 --- a/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala +++ b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala @@ -37,6 +37,7 @@ class VeloxDeltaComponent extends Component { legacy.injectTransform { c => val offload = Seq(OffloadDeltaScan(), OffloadDeltaProject(), OffloadDeltaFilter()) + .map(_.toStrcitRule()) HeuristicTransform.Simple(Validators.newValidator(c.glutenConf, offload), offload) } val offloads: Seq[RasOffload] = Seq( diff --git a/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala b/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala index 262fa82f20f9..6b8ead8c3078 100644 --- a/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala +++ b/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala @@ -39,6 +39,7 @@ object OffloadKafkaScan { injector.gluten.legacy.injectTransform { c => val offload = Seq(OffloadKafkaScan()) + .map(_.toStrcitRule()) HeuristicTransform.Simple( Validators.newValidator(c.glutenConf, offload), offload From 5bef42b0c8042d036b63480db1df8ab293276d5f Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 4 Apr 2025 15:09:00 +0200 Subject: [PATCH 13/14] fixup --- .../scala/org/apache/gluten/component/VeloxHudiComponent.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala b/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala index c9eeabcdfe01..b11645efbd50 100644 --- a/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala +++ b/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala @@ -36,7 +36,7 @@ class VeloxHudiComponent extends Component { val ras = injector.gluten.ras legacy.injectTransform { c => - val offload = Seq(OffloadHudiScan()) + val offload = Seq(OffloadHudiScan()).map(_.toStrcitRule()) HeuristicTransform.Simple(Validators.newValidator(c.glutenConf, offload), offload) } ras.injectRasRule { From ab23c8bd9282f37683c5b4b7511f5f44a02db12e Mon Sep 17 00:00:00 2001 From: Hongze Zhang Date: Fri, 4 Apr 2025 15:17:49 +0200 Subject: [PATCH 14/14] fixup --- .../scala/org/apache/gluten/execution/OffloadKafkaScan.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala b/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala index 6b8ead8c3078..262fa82f20f9 100644 --- a/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala +++ b/gluten-kafka/src/main/scala/org/apache/gluten/execution/OffloadKafkaScan.scala @@ -39,7 +39,6 @@ object OffloadKafkaScan { injector.gluten.legacy.injectTransform { c => val offload = Seq(OffloadKafkaScan()) - .map(_.toStrcitRule()) HeuristicTransform.Simple( Validators.newValidator(c.glutenConf, offload), offload