Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
startOfQuery: Boolean = true): Unit = {
val resolver = conf.resolver
plan match {
case UnresolvedWith(child, relations) =>
case UnresolvedWith(child, relations, _) =>
val newNames = ArrayBuffer.empty[String]
newNames ++= outerCTERelationNames
relations.foreach {
Expand All @@ -149,7 +149,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
plan: LogicalPlan,
cteDefs: ArrayBuffer[CTERelationDef]): LogicalPlan = {
plan.resolveOperatorsUp {
case UnresolvedWith(child, relations) =>
case UnresolvedWith(child, relations, _) =>
val resolvedCTERelations =
resolveCTERelations(relations, isLegacy = true, forceInline = false, Seq.empty, cteDefs)
substituteCTE(child, alwaysInline = true, resolvedCTERelations)
Expand Down Expand Up @@ -202,7 +202,7 @@ object CTESubstitution extends Rule[LogicalPlan] {
var firstSubstituted: Option[LogicalPlan] = None
val newPlan = plan.resolveOperatorsDownWithPruning(
_.containsAnyPattern(UNRESOLVED_WITH, PLAN_EXPRESSION)) {
case UnresolvedWith(child: LogicalPlan, relations) =>
case UnresolvedWith(child: LogicalPlan, relations, _) =>
val resolvedCTERelations =
resolveCTERelations(relations, isLegacy = false, forceInline, outerCTEDefs, cteDefs) ++
outerCTEDefs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ object PushdownPredicatesAndPruneColumnsForCTEDef extends Rule[LogicalPlan] {
private def pushdownPredicatesAndAttributes(
plan: LogicalPlan,
cteMap: CTEMap): LogicalPlan = plan.transformWithSubqueries {
case cteDef @ CTERelationDef(child, id, originalPlanWithPredicates, _) =>
case cteDef @ CTERelationDef(child, id, originalPlanWithPredicates, _, _, _) =>
val (_, _, newPreds, newAttrSet) = cteMap(id)
val originalPlan = originalPlanWithPredicates.map(_._1).getOrElse(child)
val preds = originalPlanWithPredicates.map(_._2).getOrElse(Seq.empty)
Expand All @@ -141,7 +141,7 @@ object PushdownPredicatesAndPruneColumnsForCTEDef extends Rule[LogicalPlan] {
cteDef
}

case cteRef @ CTERelationRef(cteId, _, output, _, _) =>
case cteRef @ CTERelationRef(cteId, _, output, _, _, _) =>
val (cteDef, _, _, newAttrSet) = cteMap(cteId)
if (needsPruning(cteDef.child, newAttrSet)) {
val indices = newAttrSet.toSeq.map(cteDef.output.indexOf)
Expand Down Expand Up @@ -170,7 +170,7 @@ object PushdownPredicatesAndPruneColumnsForCTEDef extends Rule[LogicalPlan] {
object CleanUpTempCTEInfo extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan =
plan.transformWithPruning(_.containsPattern(CTE)) {
case cteDef @ CTERelationDef(_, _, Some(_), _) =>
case cteDef @ CTERelationDef(_, _, Some(_), _, _, _) =>
cteDef.copy(originalPlanWithPredicates = None)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -801,10 +801,12 @@ object View {
* @param child The final query of this CTE.
* @param cteRelations A sequence of pair (alias, the CTE definition) that this CTE defined
* Each CTE can see the base tables and the previously defined CTEs only.
* @param allowRecursion A boolean flag if recursion is allowed.
*/
case class UnresolvedWith(
child: LogicalPlan,
cteRelations: Seq[(String, SubqueryAlias)]) extends UnaryNode {
cteRelations: Seq[(String, SubqueryAlias)],
allowRecursion: Boolean = false) extends UnaryNode {
final override val nodePatterns: Seq[TreePattern] = Seq(UNRESOLVED_WITH)

override def output: Seq[Attribute] = child.output
Expand All @@ -830,12 +832,19 @@ case class UnresolvedWith(
* pushdown to help ensure rule idempotency.
* @param underSubquery If true, it means we don't need to add a shuffle for this CTE relation as
* subquery reuse will be applied to reuse CTE relation output.
* @param recursive If true, then this CTE Definition is recursive - it contains a self-reference.
* @param recursionAnchor A helper plan node that temporary stores the anchor term of recursive
* definitions. In the beginning of recursive resolution the `ResolveWithCTE`
* rule updates this parameter and once it is resolved the same rule resolves
* the recursive [[CTERelationRef]] references and removes this parameter.
*/
case class CTERelationDef(
child: LogicalPlan,
id: Long = CTERelationDef.newId,
originalPlanWithPredicates: Option[(LogicalPlan, Seq[Expression])] = None,
underSubquery: Boolean = false) extends UnaryNode {
underSubquery: Boolean = false,
recursive: Boolean = false,
recursionAnchor: Option[LogicalPlan] = None) extends UnaryNode {

final override val nodePatterns: Seq[TreePattern] = Seq(CTE)

Expand All @@ -859,13 +868,15 @@ object CTERelationDef {
* de-duplication.
* @param statsOpt The optional statistics inferred from the corresponding CTE
* definition.
* @param recursive If this is a recursive reference.
*/
case class CTERelationRef(
cteId: Long,
_resolved: Boolean,
override val output: Seq[Attribute],
override val isStreaming: Boolean,
statsOpt: Option[Statistics] = None) extends LeafNode with MultiInstanceRelation {
statsOpt: Option[Statistics] = None,
recursive: Boolean = false) extends LeafNode with MultiInstanceRelation {

final override val nodePatterns: Seq[TreePattern] = Seq(CTE)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ CREATE TABLE cte_tbl USING csv AS WITH s AS (SELECT 42 AS col) SELECT * FROM s
-- !query analysis
CreateDataSourceTableAsSelectCommand `spark_catalog`.`default`.`cte_tbl`, ErrorIfExists, [col]
+- WithCTE
:- CTERelationDef xxxx, false
:- CTERelationDef xxxx, false, false
: +- SubqueryAlias s
: +- Project [42 AS col#x]
: +- OneRowRelation
+- Project [col#x]
+- SubqueryAlias s
+- CTERelationRef xxxx, true, [col#x], false
+- CTERelationRef xxxx, true, [col#x], false, false


-- !query
Expand All @@ -26,13 +26,13 @@ CREATE TEMPORARY VIEW cte_view AS WITH s AS (SELECT 42 AS col) SELECT * FROM s
-- !query analysis
CreateViewCommand `cte_view`, WITH s AS (SELECT 42 AS col) SELECT * FROM s, false, false, LocalTempView, UNSUPPORTED, true
+- WithCTE
:- CTERelationDef xxxx, false
:- CTERelationDef xxxx, false, false
: +- SubqueryAlias s
: +- Project [42 AS col#x]
: +- OneRowRelation
+- Project [col#x]
+- SubqueryAlias s
+- CTERelationRef xxxx, true, [col#x], false
+- CTERelationRef xxxx, true, [col#x], false, false


-- !query
Expand All @@ -43,13 +43,13 @@ Project [col#x]
+- View (`cte_view`, [col#x])
+- Project [cast(col#x as int) AS col#x]
+- WithCTE
:- CTERelationDef xxxx, false
:- CTERelationDef xxxx, false, false
: +- SubqueryAlias s
: +- Project [42 AS col#x]
: +- OneRowRelation
+- Project [col#x]
+- SubqueryAlias s
+- CTERelationRef xxxx, true, [col#x], false
+- CTERelationRef xxxx, true, [col#x], false, false


-- !query
Expand All @@ -58,13 +58,13 @@ INSERT INTO cte_tbl SELECT * FROM S
-- !query analysis
InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl], Append, `spark_catalog`.`default`.`cte_tbl`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl), [col]
+- WithCTE
:- CTERelationDef xxxx, false
:- CTERelationDef xxxx, false, false
: +- SubqueryAlias s
: +- Project [43 AS col#x]
: +- OneRowRelation
+- Project [col#x]
+- SubqueryAlias S
+- CTERelationRef xxxx, true, [col#x], false
+- CTERelationRef xxxx, true, [col#x], false, false


-- !query
Expand All @@ -80,13 +80,13 @@ INSERT INTO cte_tbl WITH s AS (SELECT 44 AS col) SELECT * FROM s
-- !query analysis
InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/cte_tbl, false, CSV, [path=file:[not included in comparison]/{warehouse_dir}/cte_tbl], Append, `spark_catalog`.`default`.`cte_tbl`, org.apache.spark.sql.execution.datasources.InMemoryFileIndex(file:[not included in comparison]/{warehouse_dir}/cte_tbl), [col]
+- WithCTE
:- CTERelationDef xxxx, false
:- CTERelationDef xxxx, false, false
: +- SubqueryAlias s
: +- Project [44 AS col#x]
: +- OneRowRelation
+- Project [col#x]
+- SubqueryAlias s
+- CTERelationRef xxxx, true, [col#x], false
+- CTERelationRef xxxx, true, [col#x], false, false


-- !query
Expand Down
Loading
Loading