Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ cmake_install.cmake
build/
*-build/
Testing/
cmake-build-debug/
cmake-build-release/
cmake-build-*/
ep/_ep/

# Editor temporary/working/backup files #
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class VeloxDeltaComponent extends Component {
legacy.injectTransform {
c =>
val offload = Seq(OffloadDeltaScan(), OffloadDeltaProject(), OffloadDeltaFilter())
.map(_.toStrcitRule())
HeuristicTransform.Simple(Validators.newValidator(c.glutenConf, offload), offload)
}
val offloads: Seq[RasOffload] = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class VeloxHudiComponent extends Component {
val ras = injector.gluten.ras
legacy.injectTransform {
c =>
val offload = Seq(OffloadHudiScan())
val offload = Seq(OffloadHudiScan()).map(_.toStrcitRule())
HeuristicTransform.Simple(Validators.newValidator(c.glutenConf, offload), offload)
}
ras.injectRasRule {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ object VeloxRuleApi {
injector.injectPreTransform(c => ArrowScanReplaceRule.apply(c.session))

// Legacy: The legacy transform rule.
val offloads = Seq(OffloadOthers(), OffloadExchange(), OffloadJoin())
val offloads = Seq(OffloadOthers(), OffloadExchange(), OffloadJoin()).map(_.toStrcitRule())
val validatorBuilder: GlutenConfig => Validator = conf =>
Validators.newValidator(conf, offloads)
val rewrites =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,17 @@
*/
package org.apache.gluten.extension.columnar.offload

import org.apache.gluten.execution.GlutenPlan
import org.apache.gluten.extension.columnar.transition.Convention

import org.apache.spark.broadcast.Broadcast
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.{LeafExecNode, SparkPlan}
import org.apache.spark.sql.vectorized.ColumnarBatch

/**
* Converts a vanilla Spark plan node into Gluten plan node. Gluten plan is supposed to be executed
Expand All @@ -29,3 +38,77 @@ import org.apache.spark.sql.execution.SparkPlan
trait OffloadSingleNode extends Logging {
def offload(plan: SparkPlan): SparkPlan
}

object OffloadSingleNode {
implicit class OffloadSingleNodeOps(rule: OffloadSingleNode) {

/**
* Converts the [[OffloadSingleNode]] rule to a strict version.
*
* In the strict version of the rule, all children of the input query plan node will be replaced
* with 'DummyLeafExec' nodes so they are not accessible from the rule body.
*/
def toStrcitRule(): OffloadSingleNode = {
new StrictRule(rule);
}
}

private class StrictRule(delegate: OffloadSingleNode) extends OffloadSingleNode {
override def offload(plan: SparkPlan): SparkPlan = {
val planWithChildrenHidden = hideChildren(plan)
val applied = delegate.offload(planWithChildrenHidden)
val out = restoreHiddenChildren(applied)
out
}

/**
* Replaces the children with 'DummyLeafExec' nodes so they become inaccessible afterward. Used
* when the children plan nodes can be dropped because not interested.
*/
private def hideChildren[T <: SparkPlan](plan: T): T = {
plan
.withNewChildren(
plan.children.map {
child =>
val dummyLeaf = DummyLeafExec(child)
child.logicalLink.foreach(dummyLeaf.setLogicalLink)
dummyLeaf
}
)
.asInstanceOf[T]
}

/** Restores hidden children from the replaced 'DummyLeafExec' nodes. */
private def restoreHiddenChildren[T <: SparkPlan](plan: T): T = {
plan
.transformDown {
case d: DummyLeafExec =>
d.hiddenPlan
case other => other
}
.asInstanceOf[T]
}
}

/**
* The plan node that hides the real child plan node during #applyOnNode call. This is used when
* query planner doesn't allow a rule to access the child plan nodes from the input query plan
* node.
*/
private case class DummyLeafExec(hiddenPlan: SparkPlan) extends LeafExecNode with GlutenPlan {
private lazy val conv: Convention = Convention.get(hiddenPlan)

override def batchType(): Convention.BatchType = conv.batchType
override def rowType0(): Convention.RowType = conv.rowType
override def output: Seq[Attribute] = hiddenPlan.output
override def outputPartitioning: Partitioning = hiddenPlan.outputPartitioning
override def outputOrdering: Seq[SortOrder] = hiddenPlan.outputOrdering

override def doExecute(): RDD[InternalRow] =
throw new UnsupportedOperationException("Not allowed in #applyOnNode call")
override def doExecuteColumnar(): RDD[ColumnarBatch] =
throw new UnsupportedOperationException("Not allowed in #applyOnNode call")
override def doExecuteBroadcast[T](): Broadcast[T] =
throw new UnsupportedOperationException("Not allowed in #applyOnNode call")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ object OffloadOthers {
// Utility to replace single node within transformed Gluten node.
// Children will be preserved as they are as children of the output node.
//
// Do not look up on children on the input node in this rule. Otherwise
// Do not look up on children on the input node in this rule. Otherwise,
// it may break RAS which would group all the possible input nodes to
// search for validate candidates.
private class ReplaceSingleNode extends LogLevelUtil with Logging {
Expand Down
Loading