diff --git a/.gitignore b/.gitignore index f402cb1e83e0..ab86334c8ebd 100644 --- a/.gitignore +++ b/.gitignore @@ -56,8 +56,7 @@ cmake_install.cmake build/ *-build/ Testing/ -cmake-build-debug/ -cmake-build-release/ +cmake-build-*/ ep/_ep/ # Editor temporary/working/backup files # diff --git a/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala index 4e5ec88c00ac..d157d7b72532 100644 --- a/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala +++ b/backends-velox/src-delta/main/scala/org/apache/gluten/component/VeloxDeltaComponent.scala @@ -37,6 +37,7 @@ class VeloxDeltaComponent extends Component { legacy.injectTransform { c => val offload = Seq(OffloadDeltaScan(), OffloadDeltaProject(), OffloadDeltaFilter()) + .map(_.toStrcitRule()) HeuristicTransform.Simple(Validators.newValidator(c.glutenConf, offload), offload) } val offloads: Seq[RasOffload] = Seq( diff --git a/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala b/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala index c9eeabcdfe01..b11645efbd50 100644 --- a/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala +++ b/backends-velox/src-hudi/main/scala/org/apache/gluten/component/VeloxHudiComponent.scala @@ -36,7 +36,7 @@ class VeloxHudiComponent extends Component { val ras = injector.gluten.ras legacy.injectTransform { c => - val offload = Seq(OffloadHudiScan()) + val offload = Seq(OffloadHudiScan()).map(_.toStrcitRule()) HeuristicTransform.Simple(Validators.newValidator(c.glutenConf, offload), offload) } ras.injectRasRule { diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index 82a20b8cc56c..04dfbf255898 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -74,7 +74,7 @@ object VeloxRuleApi { injector.injectPreTransform(c => ArrowScanReplaceRule.apply(c.session)) // Legacy: The legacy transform rule. - val offloads = Seq(OffloadOthers(), OffloadExchange(), OffloadJoin()) + val offloads = Seq(OffloadOthers(), OffloadExchange(), OffloadJoin()).map(_.toStrcitRule()) val validatorBuilder: GlutenConfig => Validator = conf => Validators.newValidator(conf, offloads) val rewrites = diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala index 232973f53a5d..4cf3e1d0c95d 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNode.scala @@ -16,8 +16,17 @@ */ package org.apache.gluten.extension.columnar.offload +import org.apache.gluten.execution.GlutenPlan +import org.apache.gluten.extension.columnar.transition.Convention + +import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.Logging -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan} +import org.apache.spark.sql.vectorized.ColumnarBatch /** * Converts a vanilla Spark plan node into Gluten plan node. Gluten plan is supposed to be executed @@ -29,3 +38,77 @@ import org.apache.spark.sql.execution.SparkPlan trait OffloadSingleNode extends Logging { def offload(plan: SparkPlan): SparkPlan } + +object OffloadSingleNode { + implicit class OffloadSingleNodeOps(rule: OffloadSingleNode) { + + /** + * Converts the [[OffloadSingleNode]] rule to a strict version. + * + * In the strict version of the rule, all children of the input query plan node will be replaced + * with 'DummyLeafExec' nodes so they are not accessible from the rule body. + */ + def toStrcitRule(): OffloadSingleNode = { + new StrictRule(rule); + } + } + + private class StrictRule(delegate: OffloadSingleNode) extends OffloadSingleNode { + override def offload(plan: SparkPlan): SparkPlan = { + val planWithChildrenHidden = hideChildren(plan) + val applied = delegate.offload(planWithChildrenHidden) + val out = restoreHiddenChildren(applied) + out + } + + /** + * Replaces the children with 'DummyLeafExec' nodes so they become inaccessible afterward. Used + * when the children plan nodes can be dropped because not interested. + */ + private def hideChildren[T <: SparkPlan](plan: T): T = { + plan + .withNewChildren( + plan.children.map { + child => + val dummyLeaf = DummyLeafExec(child) + child.logicalLink.foreach(dummyLeaf.setLogicalLink) + dummyLeaf + } + ) + .asInstanceOf[T] + } + + /** Restores hidden children from the replaced 'DummyLeafExec' nodes. */ + private def restoreHiddenChildren[T <: SparkPlan](plan: T): T = { + plan + .transformDown { + case d: DummyLeafExec => + d.hiddenPlan + case other => other + } + .asInstanceOf[T] + } + } + + /** + * The plan node that hides the real child plan node during #applyOnNode call. This is used when + * query planner doesn't allow a rule to access the child plan nodes from the input query plan + * node. + */ + private case class DummyLeafExec(hiddenPlan: SparkPlan) extends LeafExecNode with GlutenPlan { + private lazy val conv: Convention = Convention.get(hiddenPlan) + + override def batchType(): Convention.BatchType = conv.batchType + override def rowType0(): Convention.RowType = conv.rowType + override def output: Seq[Attribute] = hiddenPlan.output + override def outputPartitioning: Partitioning = hiddenPlan.outputPartitioning + override def outputOrdering: Seq[SortOrder] = hiddenPlan.outputOrdering + + override def doExecute(): RDD[InternalRow] = + throw new UnsupportedOperationException("Not allowed in #applyOnNode call") + override def doExecuteColumnar(): RDD[ColumnarBatch] = + throw new UnsupportedOperationException("Not allowed in #applyOnNode call") + override def doExecuteBroadcast[T](): Broadcast[T] = + throw new UnsupportedOperationException("Not allowed in #applyOnNode call") + } +} diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala index d356a39b4103..611b2a8ff71f 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/offload/OffloadSingleNodeRules.scala @@ -184,7 +184,7 @@ object OffloadOthers { // Utility to replace single node within transformed Gluten node. // Children will be preserved as they are as children of the output node. // - // Do not look up on children on the input node in this rule. Otherwise + // Do not look up on children on the input node in this rule. Otherwise, // it may break RAS which would group all the possible input nodes to // search for validate candidates. private class ReplaceSingleNode extends LogLevelUtil with Logging {