From c151b60af43bca8f25414736657a276c1576df57 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 17 Jan 2024 17:00:56 +0800 Subject: [PATCH 01/21] [SPARK-46741][SQL] Cache Table with CET won't work --- .../plans/logical/basicLogicalOperators.scala | 8 +++ .../catalyst/plans/logical/v2Commands.scala | 7 +- .../sql-tests/analyzer-results/cache.sql.out | 56 +++++++++++++++ .../test/resources/sql-tests/inputs/cache.sql | 14 ++++ .../resources/sql-tests/results/cache.sql.out | 71 +++++++++++++++++++ 5 files changed, 155 insertions(+), 1 deletion(-) create mode 100644 sql/core/src/test/resources/sql-tests/analyzer-results/cache.sql.out create mode 100644 sql/core/src/test/resources/sql-tests/inputs/cache.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/cache.sql.out diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index fbbae16130c01..cbabd73580a15 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -842,6 +842,10 @@ case class CTERelationDef( copy(child = newChild) override def output: Seq[Attribute] = if (resolved) child.output else Nil + + override def doCanonicalize(): LogicalPlan = { + copy(child = child.canonicalized, id = 0) + } } object CTERelationDef { @@ -885,6 +889,10 @@ case class CTERelationRef( def withNewStats(statsOpt: Option[Statistics]): CTERelationRef = copy(statsOpt = statsOpt) override def computeStats(): Statistics = statsOpt.getOrElse(Statistics(conf.defaultSizeInBytes)) + + override def doCanonicalize(): LogicalPlan = { + super.doCanonicalize().asInstanceOf[CTERelationRef].copy(cteId = 0) + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala index b179268189000..f1d93b6c26f73 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala @@ -1357,7 +1357,8 @@ case class CacheTableAsSelect( isLazy: Boolean, options: Map[String, String], isAnalyzed: Boolean = false, - referredTempFunctions: Seq[String] = Seq.empty) extends AnalysisOnlyCommand { + referredTempFunctions: Seq[String] = Seq.empty) + extends AnalysisOnlyCommand with CTEInChildren { override protected def withNewChildrenInternal( newChildren: IndexedSeq[LogicalPlan]): CacheTableAsSelect = { assert(!isAnalyzed) @@ -1372,6 +1373,10 @@ case class CacheTableAsSelect( // Collect the referred temporary functions from AnalysisContext referredTempFunctions = ac.referredTempFunctionNames.toSeq) } + + override def withCTEDefs(cteDefs: Seq[CTERelationDef]): LogicalPlan = { + copy(plan = WithCTE(plan, cteDefs)) + } } /** diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/cache.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/cache.sql.out new file mode 100644 index 0000000000000..7ae4937b05eac --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/cache.sql.out @@ -0,0 +1,56 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES 0, 1, 2 AS t(id) +-- !query analysis +CreateViewCommand `t1`, SELECT * FROM VALUES 0, 1, 2 AS t(id), false, false, LocalTempView, true + +- Project [id#x] + +- SubqueryAlias t + +- LocalRelation [id#x] + + +-- !query +CACHE TABLE cache_table +WITH +t2 AS (SELECT 1) +SELECT * FROM t2 +-- !query analysis +CacheTableAsSelect cache_table, WITH +t2 AS (SELECT 1) +SELECT * FROM t2, false, true + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias t2 + : +- Project [1 AS 1#x] + : +- OneRowRelation + +- Project [1#x] + +- SubqueryAlias t2 + +- CTERelationRef xxxx, true, [1#x], false + + +-- !query +SELECT * FROM cache_table +-- !query analysis +Project [1#x] ++- SubqueryAlias cache_table + +- View (`cache_table`, [1#x]) + +- Project [cast(1#x as int) AS 1#x] + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias t2 + : +- Project [1 AS 1#x] + : +- OneRowRelation + +- Project [1#x] + +- SubqueryAlias t2 + +- CTERelationRef xxxx, true, [1#x], false + + +-- !query +EXPLAIN EXTENDED SELECT * FROM cache_table +-- !query analysis +ExplainCommand 'Project [*], ExtendedMode + + +-- !query +DROP TABLE IF EXISTS t1 +-- !query analysis +DropTempViewCommand t1 diff --git a/sql/core/src/test/resources/sql-tests/inputs/cache.sql b/sql/core/src/test/resources/sql-tests/inputs/cache.sql new file mode 100644 index 0000000000000..ebc8fa2bc591b --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/cache.sql @@ -0,0 +1,14 @@ +CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES 0, 1, 2 AS t(id); + + +CACHE TABLE cache_table +WITH +t2 AS (SELECT 1) +SELECT * FROM t2; + +SELECT * FROM cache_table; + +EXPLAIN EXTENDED SELECT * FROM cache_table; + +-- Clean up +DROP TABLE IF EXISTS t1; diff --git a/sql/core/src/test/resources/sql-tests/results/cache.sql.out b/sql/core/src/test/resources/sql-tests/results/cache.sql.out new file mode 100644 index 0000000000000..cc58837a5f4ca --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/cache.sql.out @@ -0,0 +1,71 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES 0, 1, 2 AS t(id) +-- !query schema +struct<> +-- !query output + + + +-- !query +CACHE TABLE cache_table +WITH +t2 AS (SELECT 1) +SELECT * FROM t2 +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT * FROM cache_table +-- !query schema +struct<1:int> +-- !query output +1 + + +-- !query +EXPLAIN EXTENDED SELECT * FROM cache_table +-- !query schema +struct +-- !query output +== Parsed Logical Plan == +'Project [*] ++- 'UnresolvedRelation [cache_table], [], false + +== Analyzed Logical Plan == +1: int +Project [1#x] ++- SubqueryAlias cache_table + +- View (`cache_table`, [1#x]) + +- Project [cast(1#x as int) AS 1#x] + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias t2 + : +- Project [1 AS 1#x] + : +- OneRowRelation + +- Project [1#x] + +- SubqueryAlias t2 + +- CTERelationRef xxxx, true, [1#x], false + +== Optimized Logical Plan == +InMemoryRelation [1#x], StorageLevel(disk, memory, deserialized, 1 replicas) + +- *Project [1 AS 1#x] + +- *Scan OneRowRelation[] + +== Physical Plan == +AdaptiveSparkPlan isFinalPlan=false ++- Scan In-memory table cache_table [1#x] + +- InMemoryRelation [1#x], StorageLevel(disk, memory, deserialized, 1 replicas) + +- *Project [1 AS 1#x] + +- *Scan OneRowRelation[] + + +-- !query +DROP TABLE IF EXISTS t1 +-- !query schema +struct<> +-- !query output + From b9711ab19fc05d68a0bc8c53862c825ef77e57d1 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 17 Jan 2024 19:05:22 +0800 Subject: [PATCH 02/21] Update basicLogicalOperators.scala --- .../plans/logical/basicLogicalOperators.scala | 32 ++++++++++++++----- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index cbabd73580a15..229246aae43fe 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -842,10 +842,6 @@ case class CTERelationDef( copy(child = newChild) override def output: Seq[Attribute] = if (resolved) child.output else Nil - - override def doCanonicalize(): LogicalPlan = { - copy(child = child.canonicalized, id = 0) - } } object CTERelationDef { @@ -889,10 +885,6 @@ case class CTERelationRef( def withNewStats(statsOpt: Option[Statistics]): CTERelationRef = copy(statsOpt = statsOpt) override def computeStats(): Statistics = statsOpt.getOrElse(Statistics(conf.defaultSizeInBytes)) - - override def doCanonicalize(): LogicalPlan = { - super.doCanonicalize().asInstanceOf[CTERelationRef].copy(cteId = 0) - } } /** @@ -918,6 +910,30 @@ case class WithCTE(plan: LogicalPlan, cteDefs: Seq[CTERelationDef]) extends Logi def withNewPlan(newPlan: LogicalPlan): WithCTE = { withNewChildren(children.init :+ newPlan).asInstanceOf[WithCTE] } + + override def doCanonicalize(): LogicalPlan = { + val canonicalized = super.doCanonicalize().asInstanceOf[WithCTE] + val defIndex = canonicalized.cteDefs.map(_.id).zipWithIndex.toMap + + def canonicalizeCTE(plan: LogicalPlan): LogicalPlan = { + plan.resolveOperatorsUpWithPruning( + _.containsAnyPattern(CTE, PLAN_EXPRESSION)) { + case ref: CTERelationRef => + ref.copy(cteId = defIndex(ref.cteId).toLong) + + case other => + other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { + case e: SubqueryExpression => e.withNewPlan(canonicalizeCTE(e.plan)) + } + } + } + + canonicalized.copy( + plan = canonicalizeCTE(canonicalized.plan), + cteDefs = canonicalized.cteDefs.map { cteDef => + cteDef.copy(id = defIndex(cteDef.id).toLong) + }) + } } /** From 38c68d8586db89875b825fd4f9396e576675dada Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Fri, 19 Jan 2024 15:39:55 +0800 Subject: [PATCH 03/21] Update basicLogicalOperators.scala --- .../spark/sql/catalyst/plans/logical/basicLogicalOperators.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 229246aae43fe..2fc32ed37ce8e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -895,6 +895,7 @@ case class CTERelationRef( * @param cteDefs The CTE definitions. */ case class WithCTE(plan: LogicalPlan, cteDefs: Seq[CTERelationDef]) extends LogicalPlan { + assert(plan.find(_.isInstanceOf[WithCTE]).isEmpty, "Spark didn't support nested WithCTE.") final override val nodePatterns: Seq[TreePattern] = Seq(CTE) From d587e6d02064c98a07eca95841beda7a29997fec Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Tue, 23 Jan 2024 18:04:44 +0800 Subject: [PATCH 04/21] update --- .../plans/logical/basicLogicalOperators.scala | 6 +- .../sql-tests/analyzer-results/cache.sql.out | 136 +++++++++++++++++- .../test/resources/sql-tests/inputs/cache.sql | 23 ++- .../resources/sql-tests/results/cache.sql.out | 127 +++++++++++++++- 4 files changed, 283 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 2fc32ed37ce8e..7fada80aa0423 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -895,7 +895,6 @@ case class CTERelationRef( * @param cteDefs The CTE definitions. */ case class WithCTE(plan: LogicalPlan, cteDefs: Seq[CTERelationDef]) extends LogicalPlan { - assert(plan.find(_.isInstanceOf[WithCTE]).isEmpty, "Spark didn't support nested WithCTE.") final override val nodePatterns: Seq[TreePattern] = Seq(CTE) @@ -919,7 +918,10 @@ case class WithCTE(plan: LogicalPlan, cteDefs: Seq[CTERelationDef]) extends Logi def canonicalizeCTE(plan: LogicalPlan): LogicalPlan = { plan.resolveOperatorsUpWithPruning( _.containsAnyPattern(CTE, PLAN_EXPRESSION)) { - case ref: CTERelationRef => + + // For nested WithCTE, if defIndex didn't contain the cteId, + // means it's not current WithCTE's ref. + case ref: CTERelationRef if defIndex.contains(ref.cteId) => ref.copy(cteId = defIndex(ref.cteId).toLong) case other => diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/cache.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/cache.sql.out index 7ae4937b05eac..f43cc75a7298f 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/cache.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/cache.sql.out @@ -1,11 +1,37 @@ -- Automatically generated by SQLQueryTestSuite -- !query -CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES 0, 1, 2 AS t(id) +CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (0, 0), (1, 1), (2, 2) AS t(c1, c2) -- !query analysis -CreateViewCommand `t1`, SELECT * FROM VALUES 0, 1, 2 AS t(id), false, false, LocalTempView, true - +- Project [id#x] +CreateViewCommand `t1`, SELECT * FROM VALUES (0, 0), (1, 1), (2, 2) AS t(c1, c2), false, false, LocalTempView, true + +- Project [c1#x, c2#x] +- SubqueryAlias t - +- LocalRelation [id#x] + +- LocalRelation [c1#x, c2#x] + + +-- !query +CREATE TEMPORARY VIEW t2 AS +WITH v as ( + SELECT c1 + c1 c3 FROM t1 +) +SELECT SUM(c3) s FROM v +-- !query analysis +CreateViewCommand `t2`, WITH v as ( + SELECT c1 + c1 c3 FROM t1 +) +SELECT SUM(c3) s FROM v, false, false, LocalTempView, true + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias v + : +- Project [(c1#x + c1#x) AS c3#x] + : +- SubqueryAlias t1 + : +- View (`t1`, [c1#x, c2#x]) + : +- Project [cast(c1#x as int) AS c1#x, cast(c2#x as int) AS c2#x] + : +- Project [c1#x, c2#x] + : +- SubqueryAlias t + : +- LocalRelation [c1#x, c2#x] + +- Aggregate [sum(c3#x) AS s#xL] + +- SubqueryAlias v + +- CTERelationRef xxxx, true, [c3#x], false -- !query @@ -50,7 +76,109 @@ EXPLAIN EXTENDED SELECT * FROM cache_table ExplainCommand 'Project [*], ExtendedMode +-- !query +CACHE TABLE cache_nested_cte_table +WITH +v AS ( + SELECT c1 * c2 c3 from t1 +) +SELECT SUM(c3) FROM v +EXCEPT +SELECT s FROM t2 +-- !query analysis +CacheTableAsSelect cache_nested_cte_table, WITH +v AS ( + SELECT c1 * c2 c3 from t1 +) +SELECT SUM(c3) FROM v +EXCEPT +SELECT s FROM t2, false, true + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias v + : +- Project [(c1#x * c2#x) AS c3#x] + : +- SubqueryAlias t1 + : +- View (`t1`, [c1#x, c2#x]) + : +- Project [cast(c1#x as int) AS c1#x, cast(c2#x as int) AS c2#x] + : +- Project [c1#x, c2#x] + : +- SubqueryAlias t + : +- LocalRelation [c1#x, c2#x] + +- Except false + :- Aggregate [sum(c3#x) AS sum(c3)#xL] + : +- SubqueryAlias v + : +- CTERelationRef xxxx, true, [c3#x], false + +- Project [s#xL] + +- SubqueryAlias t2 + +- View (`t2`, [s#xL]) + +- Project [cast(s#xL as bigint) AS s#xL] + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias v + : +- Project [(c1#x + c1#x) AS c3#x] + : +- SubqueryAlias t1 + : +- View (`t1`, [c1#x, c2#x]) + : +- Project [cast(c1#x as int) AS c1#x, cast(c2#x as int) AS c2#x] + : +- Project [c1#x, c2#x] + : +- SubqueryAlias t + : +- LocalRelation [c1#x, c2#x] + +- Aggregate [sum(c3#x) AS s#xL] + +- SubqueryAlias v + +- CTERelationRef xxxx, true, [c3#x], false + + +-- !query +SELECT * FROM cache_nested_cte_table +-- !query analysis +Project [sum(c3)#xL] ++- SubqueryAlias cache_nested_cte_table + +- View (`cache_nested_cte_table`, [sum(c3)#xL]) + +- Project [cast(sum(c3)#xL as bigint) AS sum(c3)#xL] + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias v + : +- Project [(c1#x * c2#x) AS c3#x] + : +- SubqueryAlias t1 + : +- View (`t1`, [c1#x, c2#x]) + : +- Project [cast(c1#x as int) AS c1#x, cast(c2#x as int) AS c2#x] + : +- Project [c1#x, c2#x] + : +- SubqueryAlias t + : +- LocalRelation [c1#x, c2#x] + +- Except false + :- Aggregate [sum(c3#x) AS sum(c3)#xL] + : +- SubqueryAlias v + : +- CTERelationRef xxxx, true, [c3#x], false + +- Project [s#xL] + +- SubqueryAlias t2 + +- View (`t2`, [s#xL]) + +- Project [cast(s#xL as bigint) AS s#xL] + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias v + : +- Project [(c1#x + c1#x) AS c3#x] + : +- SubqueryAlias t1 + : +- View (`t1`, [c1#x, c2#x]) + : +- Project [cast(c1#x as int) AS c1#x, cast(c2#x as int) AS c2#x] + : +- Project [c1#x, c2#x] + : +- SubqueryAlias t + : +- LocalRelation [c1#x, c2#x] + +- Aggregate [sum(c3#x) AS s#xL] + +- SubqueryAlias v + +- CTERelationRef xxxx, true, [c3#x], false + + +-- !query +EXPLAIN EXTENDED SELECT * FROM cache_nested_cte_table +-- !query analysis +ExplainCommand 'Project [*], ExtendedMode + + -- !query DROP TABLE IF EXISTS t1 -- !query analysis DropTempViewCommand t1 + + +-- !query +DROP TABLE IF EXISTS t2 +-- !query analysis +DropTempViewCommand t2 diff --git a/sql/core/src/test/resources/sql-tests/inputs/cache.sql b/sql/core/src/test/resources/sql-tests/inputs/cache.sql index ebc8fa2bc591b..b3588ce49e596 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/cache.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/cache.sql @@ -1,5 +1,9 @@ -CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES 0, 1, 2 AS t(id); - +CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (0, 0), (1, 1), (2, 2) AS t(c1, c2); +CREATE TEMPORARY VIEW t2 AS +WITH v as ( + SELECT c1 + c1 c3 FROM t1 +) +SELECT SUM(c3) s FROM v; CACHE TABLE cache_table WITH @@ -10,5 +14,20 @@ SELECT * FROM cache_table; EXPLAIN EXTENDED SELECT * FROM cache_table; +-- Nested WithCTE +CACHE TABLE cache_nested_cte_table +WITH +v AS ( + SELECT c1 * c2 c3 from t1 +) +SELECT SUM(c3) FROM v +EXCEPT +SELECT s FROM t2; + +SELECT * FROM cache_nested_cte_table; + +EXPLAIN EXTENDED SELECT * FROM cache_nested_cte_table; + -- Clean up DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; diff --git a/sql/core/src/test/resources/sql-tests/results/cache.sql.out b/sql/core/src/test/resources/sql-tests/results/cache.sql.out index cc58837a5f4ca..59e6712392d55 100644 --- a/sql/core/src/test/resources/sql-tests/results/cache.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/cache.sql.out @@ -1,6 +1,18 @@ -- Automatically generated by SQLQueryTestSuite -- !query -CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES 0, 1, 2 AS t(id) +CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (0, 0), (1, 1), (2, 2) AS t(c1, c2) +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE TEMPORARY VIEW t2 AS +WITH v as ( + SELECT c1 + c1 c3 FROM t1 +) +SELECT SUM(c3) s FROM v -- !query schema struct<> -- !query output @@ -63,9 +75,122 @@ AdaptiveSparkPlan isFinalPlan=false +- *Scan OneRowRelation[] +-- !query +CACHE TABLE cache_nested_cte_table +WITH +v AS ( + SELECT c1 * c2 c3 from t1 +) +SELECT SUM(c3) FROM v +EXCEPT +SELECT s FROM t2 +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT * FROM cache_nested_cte_table +-- !query schema +struct +-- !query output +5 + + +-- !query +EXPLAIN EXTENDED SELECT * FROM cache_nested_cte_table +-- !query schema +struct +-- !query output +== Parsed Logical Plan == +'Project [*] ++- 'UnresolvedRelation [cache_nested_cte_table], [], false + +== Analyzed Logical Plan == +sum(c3): bigint +Project [sum(c3)#xL] ++- SubqueryAlias cache_nested_cte_table + +- View (`cache_nested_cte_table`, [sum(c3)#xL]) + +- Project [cast(sum(c3)#xL as bigint) AS sum(c3)#xL] + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias v + : +- Project [(c1#x * c2#x) AS c3#x] + : +- SubqueryAlias t1 + : +- View (`t1`, [c1#x, c2#x]) + : +- Project [cast(c1#x as int) AS c1#x, cast(c2#x as int) AS c2#x] + : +- Project [c1#x, c2#x] + : +- SubqueryAlias t + : +- LocalRelation [c1#x, c2#x] + +- Except false + :- Aggregate [sum(c3#x) AS sum(c3)#xL] + : +- SubqueryAlias v + : +- CTERelationRef xxxx, true, [c3#x], false + +- Project [s#xL] + +- SubqueryAlias t2 + +- View (`t2`, [s#xL]) + +- Project [cast(s#xL as bigint) AS s#xL] + +- WithCTE + :- CTERelationDef xxxx, false + : +- SubqueryAlias v + : +- Project [(c1#x + c1#x) AS c3#x] + : +- SubqueryAlias t1 + : +- View (`t1`, [c1#x, c2#x]) + : +- Project [cast(c1#x as int) AS c1#x, cast(c2#x as int) AS c2#x] + : +- Project [c1#x, c2#x] + : +- SubqueryAlias t + : +- LocalRelation [c1#x, c2#x] + +- Aggregate [sum(c3#x) AS s#xL] + +- SubqueryAlias v + +- CTERelationRef xxxx, true, [c3#x], false + +== Optimized Logical Plan == +InMemoryRelation [sum(c3)#xL], StorageLevel(disk, memory, deserialized, 1 replicas) + +- AdaptiveSparkPlan isFinalPlan=false + +- BroadcastHashJoin [coalesce(sum(c3)#xL, 0), isnull(sum(c3)#xL)], [coalesce(s#xL, 0), isnull(s#xL)], LeftAnti, BuildRight, false + :- HashAggregate(keys=[], functions=[sum(c3#x)], output=[sum(c3)#xL]) + : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] + : +- HashAggregate(keys=[], functions=[partial_sum(c3#x)], output=[sum#xL]) + : +- Project [(c1#x * c2#x) AS c3#x] + : +- LocalTableScan [c1#x, c2#x] + +- BroadcastExchange HashedRelationBroadcastMode(List(coalesce(input[0, bigint, true], 0), isnull(input[0, bigint, true])),false), [plan_id=x] + +- HashAggregate(keys=[], functions=[sum(c3#x)], output=[s#xL]) + +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] + +- HashAggregate(keys=[], functions=[partial_sum(c3#x)], output=[sum#xL]) + +- Project [(c1#x + c1#x) AS c3#x] + +- LocalTableScan [c1#x, c2#x] + +== Physical Plan == +AdaptiveSparkPlan isFinalPlan=false ++- Scan In-memory table cache_nested_cte_table [sum(c3)#xL] + +- InMemoryRelation [sum(c3)#xL], StorageLevel(disk, memory, deserialized, 1 replicas) + +- AdaptiveSparkPlan isFinalPlan=false + +- BroadcastHashJoin [coalesce(sum(c3)#xL, 0), isnull(sum(c3)#xL)], [coalesce(s#xL, 0), isnull(s#xL)], LeftAnti, BuildRight, false + :- HashAggregate(keys=[], functions=[sum(c3#x)], output=[sum(c3)#xL]) + : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] + : +- HashAggregate(keys=[], functions=[partial_sum(c3#x)], output=[sum#xL]) + : +- Project [(c1#x * c2#x) AS c3#x] + : +- LocalTableScan [c1#x, c2#x] + +- BroadcastExchange HashedRelationBroadcastMode(List(coalesce(input[0, bigint, true], 0), isnull(input[0, bigint, true])),false), [plan_id=x] + +- HashAggregate(keys=[], functions=[sum(c3#x)], output=[s#xL]) + +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] + +- HashAggregate(keys=[], functions=[partial_sum(c3#x)], output=[sum#xL]) + +- Project [(c1#x + c1#x) AS c3#x] + +- LocalTableScan [c1#x, c2#x] + + -- !query DROP TABLE IF EXISTS t1 -- !query schema struct<> -- !query output + + +-- !query +DROP TABLE IF EXISTS t2 +-- !query schema +struct<> +-- !query output + From bb15d32d0aa5c83c70d8f96187b51d941362e304 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 8 May 2024 10:31:09 +0800 Subject: [PATCH 05/21] Follow comment to add normalize rule --- .../normalizer/WithCTENormalized.scala | 57 +++++++++++++++++++ .../plans/logical/basicLogicalOperators.scala | 27 --------- .../spark/sql/execution/QueryExecution.scala | 48 +++++++++------- .../internal/BaseSessionStateBuilder.scala | 2 + 4 files changed, 86 insertions(+), 48 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala new file mode 100644 index 0000000000000..9e7675b649627 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.normalizer + +import org.apache.spark.sql.catalyst.expressions.SubqueryExpression +import org.apache.spark.sql.catalyst.plans.logical.{CacheTableAsSelect, CTERelationRef, LogicalPlan, WithCTE} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.{CTE, PLAN_EXPRESSION} + +object WithCTENormalized extends Rule[LogicalPlan]{ + override def apply(plan: LogicalPlan): LogicalPlan = { + val curId = new java.util.concurrent.atomic.AtomicLong() + plan transformDown { + + case ctas @ CacheTableAsSelect(_, plan, _, _, _, _, _) => + ctas.copy(plan = apply(plan)) + + case withCTE @ WithCTE(plan, cteDefs) => + val defIdToNewId = withCTE.cteDefs.map(_.id).map((_, curId.getAndIncrement())).toMap + val normalizedPlan = canonicalizeCTE(plan, defIdToNewId) + val newCteDefs = cteDefs.map { cteDef => + val normalizedCteDef = canonicalizeCTE(cteDef.child, defIdToNewId) + cteDef.copy(child = normalizedCteDef, id = defIdToNewId(cteDef.id)) + } + withCTE.copy(plan = normalizedPlan, cteDefs = newCteDefs) + } + } + + def canonicalizeCTE(plan: LogicalPlan, defIdToNewId: Map[Long, Long]): LogicalPlan = { + plan.transformDownWithPruning( + _.containsAnyPattern(CTE, PLAN_EXPRESSION)) { + // For nested WithCTE, if defIndex didn't contain the cteId, + // means it's not current WithCTE's ref. + case ref: CTERelationRef if defIdToNewId.contains(ref.cteId) => + ref.copy(cteId = defIdToNewId(ref.cteId)) + case other => + other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { + case e: SubqueryExpression => e.withNewPlan(canonicalizeCTE(e.plan, defIdToNewId)) + } + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index b323c3d404f88..7c2dfd31f4e33 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -910,33 +910,6 @@ case class WithCTE(plan: LogicalPlan, cteDefs: Seq[CTERelationDef]) extends Logi def withNewPlan(newPlan: LogicalPlan): WithCTE = { withNewChildren(children.init :+ newPlan).asInstanceOf[WithCTE] } - - override def doCanonicalize(): LogicalPlan = { - val canonicalized = super.doCanonicalize().asInstanceOf[WithCTE] - val defIndex = canonicalized.cteDefs.map(_.id).zipWithIndex.toMap - - def canonicalizeCTE(plan: LogicalPlan): LogicalPlan = { - plan.resolveOperatorsUpWithPruning( - _.containsAnyPattern(CTE, PLAN_EXPRESSION)) { - - // For nested WithCTE, if defIndex didn't contain the cteId, - // means it's not current WithCTE's ref. - case ref: CTERelationRef if defIndex.contains(ref.cteId) => - ref.copy(cteId = defIndex(ref.cteId).toLong) - - case other => - other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { - case e: SubqueryExpression => e.withNewPlan(canonicalizeCTE(e.plan)) - } - } - } - - canonicalized.copy( - plan = canonicalizeCTE(canonicalized.plan), - cteDefs = canonicalized.cteDefs.map { cteDef => - cteDef.copy(id = defIndex(cteDef.id).toLong) - }) - } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index a2cfad800e006..b2ac132f15005 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -91,10 +91,33 @@ class QueryExecution( plan } + // The plan that has been normalized by custom rules, so that it's more likely to hit cache. + lazy val normalized: LogicalPlan = { + val normalizationRules = sparkSession.sessionState.planNormalizationRules + if (normalizationRules.isEmpty) { + analyzed + } else { + try { + val planChangeLogger = new PlanChangeLogger[LogicalPlan]() + val normalized = normalizationRules.foldLeft(analyzed) { (p, rule) => + val result = rule.apply(p) + planChangeLogger.logRule(rule.ruleName, p, result) + result + } + planChangeLogger.logBatch("Plan Normalization", analyzed, normalized) + normalized + } catch { + case e: Exception => + e.printStackTrace() + throw e + } + } + } + lazy val commandExecuted: LogicalPlan = mode match { - case CommandExecutionMode.NON_ROOT => analyzed.mapChildren(eagerlyExecuteCommands) - case CommandExecutionMode.ALL => eagerlyExecuteCommands(analyzed) - case CommandExecutionMode.SKIP => analyzed + case CommandExecutionMode.NON_ROOT => normalized.mapChildren(eagerlyExecuteCommands) + case CommandExecutionMode.ALL => eagerlyExecuteCommands(normalized) + case CommandExecutionMode.SKIP => normalized } private def commandExecutionName(command: Command): String = command match { @@ -128,29 +151,12 @@ class QueryExecution( case other => other } - // The plan that has been normalized by custom rules, so that it's more likely to hit cache. - lazy val normalized: LogicalPlan = { - val normalizationRules = sparkSession.sessionState.planNormalizationRules - if (normalizationRules.isEmpty) { - commandExecuted - } else { - val planChangeLogger = new PlanChangeLogger[LogicalPlan]() - val normalized = normalizationRules.foldLeft(commandExecuted) { (p, rule) => - val result = rule.apply(p) - planChangeLogger.logRule(rule.ruleName, p, result) - result - } - planChangeLogger.logBatch("Plan Normalization", commandExecuted, normalized) - normalized - } - } - lazy val withCachedData: LogicalPlan = sparkSession.withActive { assertAnalyzed() assertSupported() // clone the plan to avoid sharing the plan instance between different stages like analyzing, // optimizing and planning. - sparkSession.sharedState.cacheManager.useCachedData(normalized.clone()) + sparkSession.sharedState.cacheManager.useCachedData(commandExecuted.clone()) } def assertCommandExecuted(): Unit = commandExecuted diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 00c72294ca071..091f148036d6d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.artifact.ArtifactManager import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, FunctionRegistry, ReplaceCharWithVarchar, ResolveSessionCatalog, TableFunctionRegistry} import org.apache.spark.sql.catalyst.catalog.{FunctionExpressionBuilder, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.normalizer.WithCTENormalized import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -337,6 +338,7 @@ abstract class BaseSessionStateBuilder( } protected def planNormalizationRules: Seq[Rule[LogicalPlan]] = { + WithCTENormalized +: extensions.buildPlanNormalizationRules(session) } From c9017ef1fc302b91888c4f484e8fceb0198f4663 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 8 May 2024 20:49:32 +0800 Subject: [PATCH 06/21] Update QueryExecution.scala --- .../spark/sql/execution/QueryExecution.scala | 31 +++---------------- 1 file changed, 4 insertions(+), 27 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index eeddde5e2f191..357484ca19df2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -95,33 +95,10 @@ class QueryExecution( plan } - // The plan that has been normalized by custom rules, so that it's more likely to hit cache. - lazy val normalized: LogicalPlan = { - val normalizationRules = sparkSession.sessionState.planNormalizationRules - if (normalizationRules.isEmpty) { - analyzed - } else { - try { - val planChangeLogger = new PlanChangeLogger[LogicalPlan]() - val normalized = normalizationRules.foldLeft(analyzed) { (p, rule) => - val result = rule.apply(p) - planChangeLogger.logRule(rule.ruleName, p, result) - result - } - planChangeLogger.logBatch("Plan Normalization", analyzed, normalized) - normalized - } catch { - case e: Exception => - e.printStackTrace() - throw e - } - } - } - lazy val commandExecuted: LogicalPlan = mode match { - case CommandExecutionMode.NON_ROOT => normalized.mapChildren(eagerlyExecuteCommands) - case CommandExecutionMode.ALL => eagerlyExecuteCommands(normalized) - case CommandExecutionMode.SKIP => normalized + case CommandExecutionMode.NON_ROOT => analyzed.mapChildren(eagerlyExecuteCommands) + case CommandExecutionMode.ALL => eagerlyExecuteCommands(analyzed) + case CommandExecutionMode.SKIP => analyzed } private def commandExecutionName(command: Command): String = command match { @@ -165,7 +142,7 @@ class QueryExecution( assertSupported() // clone the plan to avoid sharing the plan instance between different stages like analyzing, // optimizing and planning. - sparkSession.sharedState.cacheManager.useCachedData(commandExecuted.clone()) + sparkSession.sharedState.cacheManager.useCachedData(normalized.clone()) } def assertCommandExecuted(): Unit = commandExecuted From 4aa6aeeb7f3b542695102c2874394236d0478437 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 8 May 2024 20:59:51 +0800 Subject: [PATCH 07/21] Update cache.sql.out --- .../resources/sql-tests/results/cache.sql.out | 51 +++++++++++++++---- 1 file changed, 42 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/results/cache.sql.out b/sql/core/src/test/resources/sql-tests/results/cache.sql.out index 59e6712392d55..fcc405c8b7b18 100644 --- a/sql/core/src/test/resources/sql-tests/results/cache.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/cache.sql.out @@ -68,11 +68,10 @@ InMemoryRelation [1#x], StorageLevel(disk, memory, deserialized, 1 replicas) +- *Scan OneRowRelation[] == Physical Plan == -AdaptiveSparkPlan isFinalPlan=false -+- Scan In-memory table cache_table [1#x] - +- InMemoryRelation [1#x], StorageLevel(disk, memory, deserialized, 1 replicas) - +- *Project [1 AS 1#x] - +- *Scan OneRowRelation[] +Scan In-memory table cache_table [1#x] + +- InMemoryRelation [1#x], StorageLevel(disk, memory, deserialized, 1 replicas) + +- *Project [1 AS 1#x] + +- *Scan OneRowRelation[] -- !query @@ -147,8 +146,25 @@ Project [sum(c3)#xL] == Optimized Logical Plan == InMemoryRelation [sum(c3)#xL], StorageLevel(disk, memory, deserialized, 1 replicas) - +- AdaptiveSparkPlan isFinalPlan=false - +- BroadcastHashJoin [coalesce(sum(c3)#xL, 0), isnull(sum(c3)#xL)], [coalesce(s#xL, 0), isnull(s#xL)], LeftAnti, BuildRight, false + +- AdaptiveSparkPlan isFinalPlan=true + +- == Final Plan == + *BroadcastHashJoin [coalesce(sum(c3)#xL, 0), isnull(sum(c3)#xL)], [coalesce(s#xL, 0), isnull(s#xL)], LeftAnti, BuildRight, false + :- *HashAggregate(keys=[], functions=[sum(c3#x)], output=[sum(c3)#xL]) + : +- ShuffleQueryStage 0 + : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] + : +- *HashAggregate(keys=[], functions=[partial_sum(c3#x)], output=[sum#xL]) + : +- *Project [(c1#x * c2#x) AS c3#x] + : +- *LocalTableScan [c1#x, c2#x] + +- BroadcastQueryStage 2 + +- BroadcastExchange HashedRelationBroadcastMode(List(coalesce(input[0, bigint, true], 0), isnull(input[0, bigint, true])),false), [plan_id=x] + +- *HashAggregate(keys=[], functions=[sum(c3#x)], output=[s#xL]) + +- ShuffleQueryStage 1 + +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] + +- *HashAggregate(keys=[], functions=[partial_sum(c3#x)], output=[sum#xL]) + +- *Project [(c1#x + c1#x) AS c3#x] + +- *LocalTableScan [c1#x, c2#x] + +- == Initial Plan == + BroadcastHashJoin [coalesce(sum(c3)#xL, 0), isnull(sum(c3)#xL)], [coalesce(s#xL, 0), isnull(s#xL)], LeftAnti, BuildRight, false :- HashAggregate(keys=[], functions=[sum(c3#x)], output=[sum(c3)#xL]) : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] : +- HashAggregate(keys=[], functions=[partial_sum(c3#x)], output=[sum#xL]) @@ -165,8 +181,25 @@ InMemoryRelation [sum(c3)#xL], StorageLevel(disk, memory, deserialized, 1 replic AdaptiveSparkPlan isFinalPlan=false +- Scan In-memory table cache_nested_cte_table [sum(c3)#xL] +- InMemoryRelation [sum(c3)#xL], StorageLevel(disk, memory, deserialized, 1 replicas) - +- AdaptiveSparkPlan isFinalPlan=false - +- BroadcastHashJoin [coalesce(sum(c3)#xL, 0), isnull(sum(c3)#xL)], [coalesce(s#xL, 0), isnull(s#xL)], LeftAnti, BuildRight, false + +- AdaptiveSparkPlan isFinalPlan=true + +- == Final Plan == + *BroadcastHashJoin [coalesce(sum(c3)#xL, 0), isnull(sum(c3)#xL)], [coalesce(s#xL, 0), isnull(s#xL)], LeftAnti, BuildRight, false + :- *HashAggregate(keys=[], functions=[sum(c3#x)], output=[sum(c3)#xL]) + : +- ShuffleQueryStage 0 + : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] + : +- *HashAggregate(keys=[], functions=[partial_sum(c3#x)], output=[sum#xL]) + : +- *Project [(c1#x * c2#x) AS c3#x] + : +- *LocalTableScan [c1#x, c2#x] + +- BroadcastQueryStage 2 + +- BroadcastExchange HashedRelationBroadcastMode(List(coalesce(input[0, bigint, true], 0), isnull(input[0, bigint, true])),false), [plan_id=x] + +- *HashAggregate(keys=[], functions=[sum(c3#x)], output=[s#xL]) + +- ShuffleQueryStage 1 + +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] + +- *HashAggregate(keys=[], functions=[partial_sum(c3#x)], output=[sum#xL]) + +- *Project [(c1#x + c1#x) AS c3#x] + +- *LocalTableScan [c1#x, c2#x] + +- == Initial Plan == + BroadcastHashJoin [coalesce(sum(c3)#xL, 0), isnull(sum(c3)#xL)], [coalesce(s#xL, 0), isnull(s#xL)], LeftAnti, BuildRight, false :- HashAggregate(keys=[], functions=[sum(c3#x)], output=[sum(c3)#xL]) : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] : +- HashAggregate(keys=[], functions=[partial_sum(c3#x)], output=[sum#xL]) From 054231f8a0afc3c162dea02acfe32c19c95ed1b3 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Tue, 14 May 2024 15:21:12 +0800 Subject: [PATCH 08/21] follow comment --- .../normalizer/WithCTENormalized.scala | 57 ------------------- .../plans/logical/basicLogicalOperators.scala | 27 +++++++++ .../internal/BaseSessionStateBuilder.scala | 2 - 3 files changed, 27 insertions(+), 59 deletions(-) delete mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala deleted file mode 100644 index 9e7675b649627..0000000000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst.normalizer - -import org.apache.spark.sql.catalyst.expressions.SubqueryExpression -import org.apache.spark.sql.catalyst.plans.logical.{CacheTableAsSelect, CTERelationRef, LogicalPlan, WithCTE} -import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.trees.TreePattern.{CTE, PLAN_EXPRESSION} - -object WithCTENormalized extends Rule[LogicalPlan]{ - override def apply(plan: LogicalPlan): LogicalPlan = { - val curId = new java.util.concurrent.atomic.AtomicLong() - plan transformDown { - - case ctas @ CacheTableAsSelect(_, plan, _, _, _, _, _) => - ctas.copy(plan = apply(plan)) - - case withCTE @ WithCTE(plan, cteDefs) => - val defIdToNewId = withCTE.cteDefs.map(_.id).map((_, curId.getAndIncrement())).toMap - val normalizedPlan = canonicalizeCTE(plan, defIdToNewId) - val newCteDefs = cteDefs.map { cteDef => - val normalizedCteDef = canonicalizeCTE(cteDef.child, defIdToNewId) - cteDef.copy(child = normalizedCteDef, id = defIdToNewId(cteDef.id)) - } - withCTE.copy(plan = normalizedPlan, cteDefs = newCteDefs) - } - } - - def canonicalizeCTE(plan: LogicalPlan, defIdToNewId: Map[Long, Long]): LogicalPlan = { - plan.transformDownWithPruning( - _.containsAnyPattern(CTE, PLAN_EXPRESSION)) { - // For nested WithCTE, if defIndex didn't contain the cteId, - // means it's not current WithCTE's ref. - case ref: CTERelationRef if defIdToNewId.contains(ref.cteId) => - ref.copy(cteId = defIdToNewId(ref.cteId)) - case other => - other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { - case e: SubqueryExpression => e.withNewPlan(canonicalizeCTE(e.plan, defIdToNewId)) - } - } - } -} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 9242a06cf1d6e..4426f18615100 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -911,6 +911,33 @@ case class WithCTE(plan: LogicalPlan, cteDefs: Seq[CTERelationDef]) extends Logi def withNewPlan(newPlan: LogicalPlan): WithCTE = { withNewChildren(children.init :+ newPlan).asInstanceOf[WithCTE] } + + override def doCanonicalize(): LogicalPlan = { + val canonicalized = super.doCanonicalize().asInstanceOf[WithCTE] + val defIndex = canonicalized.cteDefs.map(_.id).zipWithIndex.toMap + + def canonicalizeCTE(plan: LogicalPlan): LogicalPlan = { + plan.resolveOperatorsUpWithPruning( + _.containsAnyPattern(CTE, PLAN_EXPRESSION)) { + + // For nested WithCTE, if defIndex didn't contain the cteId, + // means it's not current WithCTE's ref. + case ref: CTERelationRef if defIndex.contains(ref.cteId) => + ref.copy(cteId = defIndex(ref.cteId).toLong) + + case other => + other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { + case e: SubqueryExpression => e.withNewPlan(canonicalizeCTE(e.plan)) + } + } + } + + canonicalized.copy( + plan = canonicalizeCTE(canonicalized.plan), + cteDefs = canonicalized.cteDefs.map { cteDef => + cteDef.copy(id = defIndex(cteDef.id).toLong) + }) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 503001f03ff95..4660970814e21 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -22,7 +22,6 @@ import org.apache.spark.sql.artifact.ArtifactManager import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, FunctionRegistry, ReplaceCharWithVarchar, ResolveSessionCatalog, TableFunctionRegistry} import org.apache.spark.sql.catalyst.catalog.{FunctionExpressionBuilder, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.normalizer.WithCTENormalized import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -340,7 +339,6 @@ abstract class BaseSessionStateBuilder( } protected def planNormalizationRules: Seq[Rule[LogicalPlan]] = { - WithCTENormalized +: extensions.buildPlanNormalizationRules(session) } From 92a213d75934b10d5763eb7f0297d736168ddd62 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Tue, 14 May 2024 15:27:47 +0800 Subject: [PATCH 09/21] Update cache.sql.out --- .../test/resources/sql-tests/analyzer-results/cache.sql.out | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/cache.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/cache.sql.out index f43cc75a7298f..4b6e3a20d65a1 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/cache.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/cache.sql.out @@ -2,7 +2,7 @@ -- !query CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (0, 0), (1, 1), (2, 2) AS t(c1, c2) -- !query analysis -CreateViewCommand `t1`, SELECT * FROM VALUES (0, 0), (1, 1), (2, 2) AS t(c1, c2), false, false, LocalTempView, true +CreateViewCommand `t1`, SELECT * FROM VALUES (0, 0), (1, 1), (2, 2) AS t(c1, c2), false, false, LocalTempView, UNSUPPORTED, true +- Project [c1#x, c2#x] +- SubqueryAlias t +- LocalRelation [c1#x, c2#x] @@ -18,7 +18,7 @@ SELECT SUM(c3) s FROM v CreateViewCommand `t2`, WITH v as ( SELECT c1 + c1 c3 FROM t1 ) -SELECT SUM(c3) s FROM v, false, false, LocalTempView, true +SELECT SUM(c3) s FROM v, false, false, LocalTempView, UNSUPPORTED, true +- WithCTE :- CTERelationDef xxxx, false : +- SubqueryAlias v From 123a9861856b82c692eaddf792d5460a835e3a0b Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Tue, 14 May 2024 17:05:41 +0800 Subject: [PATCH 10/21] Revert "follow comment" This reverts commit 054231f8a0afc3c162dea02acfe32c19c95ed1b3. --- .../normalizer/WithCTENormalized.scala | 57 +++++++++++++++++++ .../plans/logical/basicLogicalOperators.scala | 27 --------- .../internal/BaseSessionStateBuilder.scala | 2 + 3 files changed, 59 insertions(+), 27 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala new file mode 100644 index 0000000000000..9e7675b649627 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.normalizer + +import org.apache.spark.sql.catalyst.expressions.SubqueryExpression +import org.apache.spark.sql.catalyst.plans.logical.{CacheTableAsSelect, CTERelationRef, LogicalPlan, WithCTE} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.{CTE, PLAN_EXPRESSION} + +object WithCTENormalized extends Rule[LogicalPlan]{ + override def apply(plan: LogicalPlan): LogicalPlan = { + val curId = new java.util.concurrent.atomic.AtomicLong() + plan transformDown { + + case ctas @ CacheTableAsSelect(_, plan, _, _, _, _, _) => + ctas.copy(plan = apply(plan)) + + case withCTE @ WithCTE(plan, cteDefs) => + val defIdToNewId = withCTE.cteDefs.map(_.id).map((_, curId.getAndIncrement())).toMap + val normalizedPlan = canonicalizeCTE(plan, defIdToNewId) + val newCteDefs = cteDefs.map { cteDef => + val normalizedCteDef = canonicalizeCTE(cteDef.child, defIdToNewId) + cteDef.copy(child = normalizedCteDef, id = defIdToNewId(cteDef.id)) + } + withCTE.copy(plan = normalizedPlan, cteDefs = newCteDefs) + } + } + + def canonicalizeCTE(plan: LogicalPlan, defIdToNewId: Map[Long, Long]): LogicalPlan = { + plan.transformDownWithPruning( + _.containsAnyPattern(CTE, PLAN_EXPRESSION)) { + // For nested WithCTE, if defIndex didn't contain the cteId, + // means it's not current WithCTE's ref. + case ref: CTERelationRef if defIdToNewId.contains(ref.cteId) => + ref.copy(cteId = defIdToNewId(ref.cteId)) + case other => + other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { + case e: SubqueryExpression => e.withNewPlan(canonicalizeCTE(e.plan, defIdToNewId)) + } + } + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 4426f18615100..9242a06cf1d6e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -911,33 +911,6 @@ case class WithCTE(plan: LogicalPlan, cteDefs: Seq[CTERelationDef]) extends Logi def withNewPlan(newPlan: LogicalPlan): WithCTE = { withNewChildren(children.init :+ newPlan).asInstanceOf[WithCTE] } - - override def doCanonicalize(): LogicalPlan = { - val canonicalized = super.doCanonicalize().asInstanceOf[WithCTE] - val defIndex = canonicalized.cteDefs.map(_.id).zipWithIndex.toMap - - def canonicalizeCTE(plan: LogicalPlan): LogicalPlan = { - plan.resolveOperatorsUpWithPruning( - _.containsAnyPattern(CTE, PLAN_EXPRESSION)) { - - // For nested WithCTE, if defIndex didn't contain the cteId, - // means it's not current WithCTE's ref. - case ref: CTERelationRef if defIndex.contains(ref.cteId) => - ref.copy(cteId = defIndex(ref.cteId).toLong) - - case other => - other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { - case e: SubqueryExpression => e.withNewPlan(canonicalizeCTE(e.plan)) - } - } - } - - canonicalized.copy( - plan = canonicalizeCTE(canonicalized.plan), - cteDefs = canonicalized.cteDefs.map { cteDef => - cteDef.copy(id = defIndex(cteDef.id).toLong) - }) - } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 4660970814e21..503001f03ff95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.artifact.ArtifactManager import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, FunctionRegistry, ReplaceCharWithVarchar, ResolveSessionCatalog, TableFunctionRegistry} import org.apache.spark.sql.catalyst.catalog.{FunctionExpressionBuilder, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.normalizer.WithCTENormalized import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -339,6 +340,7 @@ abstract class BaseSessionStateBuilder( } protected def planNormalizationRules: Seq[Rule[LogicalPlan]] = { + WithCTENormalized +: extensions.buildPlanNormalizationRules(session) } From 5448b738ce3f17325d689f627d32443d33c25ade Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Fri, 5 Dec 2025 11:27:44 +0800 Subject: [PATCH 11/21] Update BaseSessionStateBuilder.scala --- .../apache/spark/sql/internal/BaseSessionStateBuilder.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index d0318f248d206..f2cd4f3f36532 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -22,9 +22,8 @@ import org.apache.spark.sql.artifact.ArtifactManager import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTravel, FunctionRegistry, InvokeProcedures, ReplaceCharWithVarchar, ResolveDataSource, ResolveEventTimeWatermark, ResolveExecuteImmediate, ResolveSessionCatalog, ResolveTranspose, TableFunctionRegistry} import org.apache.spark.sql.catalyst.analysis.resolver.ResolverExtension import org.apache.spark.sql.catalyst.catalog.{FunctionExpressionBuilder, SessionCatalog} -import org.apache.spark.sql.catalyst.expressions.Expression -import org.apache.spark.sql.catalyst.normalizer.WithCTENormalized import org.apache.spark.sql.catalyst.expressions.{Expression, ExtractSemiStructuredFields} +import org.apache.spark.sql.catalyst.normalizer.WithCTENormalized import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan From 99ee735ea3acc7594908d353484d2cabddff7a1f Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Fri, 5 Dec 2025 11:32:14 +0800 Subject: [PATCH 12/21] update --- .../sql-tests/analyzer-results/cache.sql.out | 14 +- .../resources/sql-tests/results/cache.sql.out | 170 ++++++++++++------ 2 files changed, 124 insertions(+), 60 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/cache.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/cache.sql.out index 4b6e3a20d65a1..e93da8b86b890 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/cache.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/cache.sql.out @@ -31,7 +31,7 @@ SELECT SUM(c3) s FROM v, false, false, LocalTempView, UNSUPPORTED, true : +- LocalRelation [c1#x, c2#x] +- Aggregate [sum(c3#x) AS s#xL] +- SubqueryAlias v - +- CTERelationRef xxxx, true, [c3#x], false + +- CTERelationRef xxxx, true, [c3#x], false, false -- !query @@ -50,7 +50,7 @@ SELECT * FROM t2, false, true : +- OneRowRelation +- Project [1#x] +- SubqueryAlias t2 - +- CTERelationRef xxxx, true, [1#x], false + +- CTERelationRef xxxx, true, [1#x], false, false, 1 -- !query @@ -67,7 +67,7 @@ Project [1#x] : +- OneRowRelation +- Project [1#x] +- SubqueryAlias t2 - +- CTERelationRef xxxx, true, [1#x], false + +- CTERelationRef xxxx, true, [1#x], false, false, 1 -- !query @@ -106,7 +106,7 @@ SELECT s FROM t2, false, true +- Except false :- Aggregate [sum(c3#x) AS sum(c3)#xL] : +- SubqueryAlias v - : +- CTERelationRef xxxx, true, [c3#x], false + : +- CTERelationRef xxxx, true, [c3#x], false, false +- Project [s#xL] +- SubqueryAlias t2 +- View (`t2`, [s#xL]) @@ -123,7 +123,7 @@ SELECT s FROM t2, false, true : +- LocalRelation [c1#x, c2#x] +- Aggregate [sum(c3#x) AS s#xL] +- SubqueryAlias v - +- CTERelationRef xxxx, true, [c3#x], false + +- CTERelationRef xxxx, true, [c3#x], false, false -- !query @@ -146,7 +146,7 @@ Project [sum(c3)#xL] +- Except false :- Aggregate [sum(c3#x) AS sum(c3)#xL] : +- SubqueryAlias v - : +- CTERelationRef xxxx, true, [c3#x], false + : +- CTERelationRef xxxx, true, [c3#x], false, false +- Project [s#xL] +- SubqueryAlias t2 +- View (`t2`, [s#xL]) @@ -163,7 +163,7 @@ Project [sum(c3)#xL] : +- LocalRelation [c1#x, c2#x] +- Aggregate [sum(c3#x) AS s#xL] +- SubqueryAlias v - +- CTERelationRef xxxx, true, [c3#x], false + +- CTERelationRef xxxx, true, [c3#x], false, false -- !query diff --git a/sql/core/src/test/resources/sql-tests/results/cache.sql.out b/sql/core/src/test/resources/sql-tests/results/cache.sql.out index fcc405c8b7b18..f7c3ec6d9f031 100644 --- a/sql/core/src/test/resources/sql-tests/results/cache.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/cache.sql.out @@ -60,7 +60,7 @@ Project [1#x] : +- OneRowRelation +- Project [1#x] +- SubqueryAlias t2 - +- CTERelationRef xxxx, true, [1#x], false + +- CTERelationRef xxxx, true, [1#x], false, false, 1 == Optimized Logical Plan == InMemoryRelation [1#x], StorageLevel(disk, memory, deserialized, 1 replicas) @@ -125,7 +125,7 @@ Project [sum(c3)#xL] +- Except false :- Aggregate [sum(c3#x) AS sum(c3)#xL] : +- SubqueryAlias v - : +- CTERelationRef xxxx, true, [c3#x], false + : +- CTERelationRef xxxx, true, [c3#x], false, false +- Project [s#xL] +- SubqueryAlias t2 +- View (`t2`, [s#xL]) @@ -142,40 +142,72 @@ Project [sum(c3)#xL] : +- LocalRelation [c1#x, c2#x] +- Aggregate [sum(c3#x) AS s#xL] +- SubqueryAlias v - +- CTERelationRef xxxx, true, [c3#x], false + +- CTERelationRef xxxx, true, [c3#x], false, false == Optimized Logical Plan == InMemoryRelation [sum(c3)#xL], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == - *BroadcastHashJoin [coalesce(sum(c3)#xL, 0), isnull(sum(c3)#xL)], [coalesce(s#xL, 0), isnull(s#xL)], LeftAnti, BuildRight, false - :- *HashAggregate(keys=[], functions=[sum(c3#x)], output=[sum(c3)#xL]) - : +- ShuffleQueryStage 0 - : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] - : +- *HashAggregate(keys=[], functions=[partial_sum(c3#x)], output=[sum#xL]) - : +- *Project [(c1#x * c2#x) AS c3#x] - : +- *LocalTableScan [c1#x, c2#x] - +- BroadcastQueryStage 2 - +- BroadcastExchange HashedRelationBroadcastMode(List(coalesce(input[0, bigint, true], 0), isnull(input[0, bigint, true])),false), [plan_id=x] - +- *HashAggregate(keys=[], functions=[sum(c3#x)], output=[s#xL]) - +- ShuffleQueryStage 1 - +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] - +- *HashAggregate(keys=[], functions=[partial_sum(c3#x)], output=[sum#xL]) - +- *Project [(c1#x + c1#x) AS c3#x] - +- *LocalTableScan [c1#x, c2#x] + ResultQueryStage 1 + +- *BroadcastHashJoin [coalesce(sum(c3)#xL, 0), isnull(sum(c3)#xL)], [coalesce(s#xL, 0), isnull(s#xL)], LeftAnti, BuildRight, false + :- *Project [ReusedSubquery Subquery subquery#x, [id=#x].sum(c3) AS sum(c3)#xL] + : : +- ReusedSubquery Subquery subquery#x, [id=#x] + : +- *Scan OneRowRelation[] + +- BroadcastQueryStage 0 + +- BroadcastExchange HashedRelationBroadcastMode(List(coalesce(input[0, bigint, true], 0), isnull(input[0, bigint, true])),false), [plan_id=x] + +- *Project [Subquery subquery#x, [id=#x].s AS s#xL] + : +- Subquery subquery#x, [id=#x] + : +- AdaptiveSparkPlan isFinalPlan=true + +- == Final Plan == + ResultQueryStage 1 + +- *Project [named_struct(sum(c3), sum(c3)#xL, s, s#xL) AS mergedValue#x] + +- *HashAggregate(keys=[], functions=[sum(c3#x), sum(c3#x)], output=[sum(c3)#xL, s#xL]) + +- ShuffleQueryStage 0 + +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] + +- *HashAggregate(keys=[], functions=[partial_sum(c3#x), partial_sum(c3#x)], output=[sum#xL, sum#xL]) + +- *Project [(c1#x * c2#x) AS c3#x, (c1#x + c1#x) AS c3#x] + +- *LocalTableScan [c1#x, c2#x] + +- == Initial Plan == + Project [named_struct(sum(c3), sum(c3)#xL, s, s#xL) AS mergedValue#x] + +- HashAggregate(keys=[], functions=[sum(c3#x), sum(c3#x)], output=[sum(c3)#xL, s#xL]) + +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] + +- HashAggregate(keys=[], functions=[partial_sum(c3#x), partial_sum(c3#x)], output=[sum#xL, sum#xL]) + +- Project [(c1#x * c2#x) AS c3#x, (c1#x + c1#x) AS c3#x] + +- LocalTableScan [c1#x, c2#x] + +- *Scan OneRowRelation[] +- == Initial Plan == BroadcastHashJoin [coalesce(sum(c3)#xL, 0), isnull(sum(c3)#xL)], [coalesce(s#xL, 0), isnull(s#xL)], LeftAnti, BuildRight, false - :- HashAggregate(keys=[], functions=[sum(c3#x)], output=[sum(c3)#xL]) - : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] - : +- HashAggregate(keys=[], functions=[partial_sum(c3#x)], output=[sum#xL]) - : +- Project [(c1#x * c2#x) AS c3#x] - : +- LocalTableScan [c1#x, c2#x] + :- Project [Subquery subquery#x, [id=#x].sum(c3) AS sum(c3)#xL] + : : +- Subquery subquery#x, [id=#x] + : : +- AdaptiveSparkPlan isFinalPlan=false + : : +- Project [named_struct(sum(c3), sum(c3)#xL, s, s#xL) AS mergedValue#x] + : : +- HashAggregate(keys=[], functions=[sum(c3#x), sum(c3#x)], output=[sum(c3)#xL, s#xL]) + : : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] + : : +- HashAggregate(keys=[], functions=[partial_sum(c3#x), partial_sum(c3#x)], output=[sum#xL, sum#xL]) + : : +- Project [(c1#x * c2#x) AS c3#x, (c1#x + c1#x) AS c3#x] + : : +- LocalTableScan [c1#x, c2#x] + : +- Scan OneRowRelation[] +- BroadcastExchange HashedRelationBroadcastMode(List(coalesce(input[0, bigint, true], 0), isnull(input[0, bigint, true])),false), [plan_id=x] - +- HashAggregate(keys=[], functions=[sum(c3#x)], output=[s#xL]) - +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] - +- HashAggregate(keys=[], functions=[partial_sum(c3#x)], output=[sum#xL]) - +- Project [(c1#x + c1#x) AS c3#x] - +- LocalTableScan [c1#x, c2#x] + +- Project [Subquery subquery#x, [id=#x].s AS s#xL] + : +- Subquery subquery#x, [id=#x] + : +- AdaptiveSparkPlan isFinalPlan=true + +- == Final Plan == + ResultQueryStage 1 + +- *Project [named_struct(sum(c3), sum(c3)#xL, s, s#xL) AS mergedValue#x] + +- *HashAggregate(keys=[], functions=[sum(c3#x), sum(c3#x)], output=[sum(c3)#xL, s#xL]) + +- ShuffleQueryStage 0 + +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] + +- *HashAggregate(keys=[], functions=[partial_sum(c3#x), partial_sum(c3#x)], output=[sum#xL, sum#xL]) + +- *Project [(c1#x * c2#x) AS c3#x, (c1#x + c1#x) AS c3#x] + +- *LocalTableScan [c1#x, c2#x] + +- == Initial Plan == + Project [named_struct(sum(c3), sum(c3)#xL, s, s#xL) AS mergedValue#x] + +- HashAggregate(keys=[], functions=[sum(c3#x), sum(c3#x)], output=[sum(c3)#xL, s#xL]) + +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] + +- HashAggregate(keys=[], functions=[partial_sum(c3#x), partial_sum(c3#x)], output=[sum#xL, sum#xL]) + +- Project [(c1#x * c2#x) AS c3#x, (c1#x + c1#x) AS c3#x] + +- LocalTableScan [c1#x, c2#x] + +- Scan OneRowRelation[] == Physical Plan == AdaptiveSparkPlan isFinalPlan=false @@ -183,34 +215,66 @@ AdaptiveSparkPlan isFinalPlan=false +- InMemoryRelation [sum(c3)#xL], StorageLevel(disk, memory, deserialized, 1 replicas) +- AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == - *BroadcastHashJoin [coalesce(sum(c3)#xL, 0), isnull(sum(c3)#xL)], [coalesce(s#xL, 0), isnull(s#xL)], LeftAnti, BuildRight, false - :- *HashAggregate(keys=[], functions=[sum(c3#x)], output=[sum(c3)#xL]) - : +- ShuffleQueryStage 0 - : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] - : +- *HashAggregate(keys=[], functions=[partial_sum(c3#x)], output=[sum#xL]) - : +- *Project [(c1#x * c2#x) AS c3#x] - : +- *LocalTableScan [c1#x, c2#x] - +- BroadcastQueryStage 2 - +- BroadcastExchange HashedRelationBroadcastMode(List(coalesce(input[0, bigint, true], 0), isnull(input[0, bigint, true])),false), [plan_id=x] - +- *HashAggregate(keys=[], functions=[sum(c3#x)], output=[s#xL]) - +- ShuffleQueryStage 1 - +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] - +- *HashAggregate(keys=[], functions=[partial_sum(c3#x)], output=[sum#xL]) - +- *Project [(c1#x + c1#x) AS c3#x] - +- *LocalTableScan [c1#x, c2#x] + ResultQueryStage 1 + +- *BroadcastHashJoin [coalesce(sum(c3)#xL, 0), isnull(sum(c3)#xL)], [coalesce(s#xL, 0), isnull(s#xL)], LeftAnti, BuildRight, false + :- *Project [ReusedSubquery Subquery subquery#x, [id=#x].sum(c3) AS sum(c3)#xL] + : : +- ReusedSubquery Subquery subquery#x, [id=#x] + : +- *Scan OneRowRelation[] + +- BroadcastQueryStage 0 + +- BroadcastExchange HashedRelationBroadcastMode(List(coalesce(input[0, bigint, true], 0), isnull(input[0, bigint, true])),false), [plan_id=x] + +- *Project [Subquery subquery#x, [id=#x].s AS s#xL] + : +- Subquery subquery#x, [id=#x] + : +- AdaptiveSparkPlan isFinalPlan=true + +- == Final Plan == + ResultQueryStage 1 + +- *Project [named_struct(sum(c3), sum(c3)#xL, s, s#xL) AS mergedValue#x] + +- *HashAggregate(keys=[], functions=[sum(c3#x), sum(c3#x)], output=[sum(c3)#xL, s#xL]) + +- ShuffleQueryStage 0 + +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] + +- *HashAggregate(keys=[], functions=[partial_sum(c3#x), partial_sum(c3#x)], output=[sum#xL, sum#xL]) + +- *Project [(c1#x * c2#x) AS c3#x, (c1#x + c1#x) AS c3#x] + +- *LocalTableScan [c1#x, c2#x] + +- == Initial Plan == + Project [named_struct(sum(c3), sum(c3)#xL, s, s#xL) AS mergedValue#x] + +- HashAggregate(keys=[], functions=[sum(c3#x), sum(c3#x)], output=[sum(c3)#xL, s#xL]) + +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] + +- HashAggregate(keys=[], functions=[partial_sum(c3#x), partial_sum(c3#x)], output=[sum#xL, sum#xL]) + +- Project [(c1#x * c2#x) AS c3#x, (c1#x + c1#x) AS c3#x] + +- LocalTableScan [c1#x, c2#x] + +- *Scan OneRowRelation[] +- == Initial Plan == BroadcastHashJoin [coalesce(sum(c3)#xL, 0), isnull(sum(c3)#xL)], [coalesce(s#xL, 0), isnull(s#xL)], LeftAnti, BuildRight, false - :- HashAggregate(keys=[], functions=[sum(c3#x)], output=[sum(c3)#xL]) - : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] - : +- HashAggregate(keys=[], functions=[partial_sum(c3#x)], output=[sum#xL]) - : +- Project [(c1#x * c2#x) AS c3#x] - : +- LocalTableScan [c1#x, c2#x] + :- Project [Subquery subquery#x, [id=#x].sum(c3) AS sum(c3)#xL] + : : +- Subquery subquery#x, [id=#x] + : : +- AdaptiveSparkPlan isFinalPlan=false + : : +- Project [named_struct(sum(c3), sum(c3)#xL, s, s#xL) AS mergedValue#x] + : : +- HashAggregate(keys=[], functions=[sum(c3#x), sum(c3#x)], output=[sum(c3)#xL, s#xL]) + : : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] + : : +- HashAggregate(keys=[], functions=[partial_sum(c3#x), partial_sum(c3#x)], output=[sum#xL, sum#xL]) + : : +- Project [(c1#x * c2#x) AS c3#x, (c1#x + c1#x) AS c3#x] + : : +- LocalTableScan [c1#x, c2#x] + : +- Scan OneRowRelation[] +- BroadcastExchange HashedRelationBroadcastMode(List(coalesce(input[0, bigint, true], 0), isnull(input[0, bigint, true])),false), [plan_id=x] - +- HashAggregate(keys=[], functions=[sum(c3#x)], output=[s#xL]) - +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] - +- HashAggregate(keys=[], functions=[partial_sum(c3#x)], output=[sum#xL]) - +- Project [(c1#x + c1#x) AS c3#x] - +- LocalTableScan [c1#x, c2#x] + +- Project [Subquery subquery#x, [id=#x].s AS s#xL] + : +- Subquery subquery#x, [id=#x] + : +- AdaptiveSparkPlan isFinalPlan=true + +- == Final Plan == + ResultQueryStage 1 + +- *Project [named_struct(sum(c3), sum(c3)#xL, s, s#xL) AS mergedValue#x] + +- *HashAggregate(keys=[], functions=[sum(c3#x), sum(c3#x)], output=[sum(c3)#xL, s#xL]) + +- ShuffleQueryStage 0 + +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] + +- *HashAggregate(keys=[], functions=[partial_sum(c3#x), partial_sum(c3#x)], output=[sum#xL, sum#xL]) + +- *Project [(c1#x * c2#x) AS c3#x, (c1#x + c1#x) AS c3#x] + +- *LocalTableScan [c1#x, c2#x] + +- == Initial Plan == + Project [named_struct(sum(c3), sum(c3)#xL, s, s#xL) AS mergedValue#x] + +- HashAggregate(keys=[], functions=[sum(c3#x), sum(c3#x)], output=[sum(c3)#xL, s#xL]) + +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] + +- HashAggregate(keys=[], functions=[partial_sum(c3#x), partial_sum(c3#x)], output=[sum#xL, sum#xL]) + +- Project [(c1#x * c2#x) AS c3#x, (c1#x + c1#x) AS c3#x] + +- LocalTableScan [c1#x, c2#x] + +- Scan OneRowRelation[] -- !query From 6a92b32214c8aae0c7d91c734c4b34b8111794e4 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Fri, 5 Dec 2025 15:35:18 +0800 Subject: [PATCH 13/21] Update cte-recursion.sql.out --- .../sql-tests/results/cte-recursion.sql.out | 270 ++---------------- 1 file changed, 27 insertions(+), 243 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/results/cte-recursion.sql.out b/sql/core/src/test/resources/sql-tests/results/cte-recursion.sql.out index 536fc6c4ea635..f38474db4af89 100644 --- a/sql/core/src/test/resources/sql-tests/results/cte-recursion.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/cte-recursion.sql.out @@ -341,68 +341,16 @@ WITH RECURSIVE t(n) MAX RECURSION LEVEL 100 AS ( ) SELECT * FROM t LIMIT ALL -- !query schema -struct +struct<> -- !query output -1 -10 -11 -12 -13 -14 -15 -16 -17 -18 -19 -2 -20 -21 -22 -23 -24 -25 -26 -27 -28 -29 -3 -30 -31 -32 -33 -34 -35 -36 -37 -38 -39 -4 -40 -41 -42 -43 -44 -45 -46 -47 -48 -49 -5 -50 -51 -52 -53 -54 -55 -56 -57 -58 -59 -6 -60 -7 -8 -9 +org.apache.spark.SparkException +{ + "errorClass" : "RECURSION_ROW_LIMIT_EXCEEDED", + "sqlState" : "42836", + "messageParameters" : { + "rowLimit" : "50" + } +} -- !query @@ -413,68 +361,16 @@ WITH RECURSIVE t MAX RECURSION LEVEL 100 AS ( ) SELECT * FROM t LIMIT ALL -- !query schema -struct +struct<> -- !query output -1 -10 -11 -12 -13 -14 -15 -16 -17 -18 -19 -2 -20 -21 -22 -23 -24 -25 -26 -27 -28 -29 -3 -30 -31 -32 -33 -34 -35 -36 -37 -38 -39 -4 -40 -41 -42 -43 -44 -45 -46 -47 -48 -49 -5 -50 -51 -52 -53 -54 -55 -56 -57 -58 -59 -6 -60 -7 -8 -9 +org.apache.spark.SparkException +{ + "errorClass" : "RECURSION_ROW_LIMIT_EXCEEDED", + "sqlState" : "42836", + "messageParameters" : { + "rowLimit" : "50" + } +} -- !query @@ -505,128 +401,16 @@ WITH RECURSIVE t MAX RECURSION LEVEL 100 AS ( ) (SELECT n FROM t LIMIT ALL) UNION ALL (SELECT n FROM t LIMIT ALL) -- !query schema -struct +struct<> -- !query output -1 -1 -10 -10 -11 -11 -12 -12 -13 -13 -14 -14 -15 -15 -16 -16 -17 -17 -18 -18 -19 -19 -2 -2 -20 -20 -21 -21 -22 -22 -23 -23 -24 -24 -25 -25 -26 -26 -27 -27 -28 -28 -29 -29 -3 -3 -30 -30 -31 -31 -32 -32 -33 -33 -34 -34 -35 -35 -36 -36 -37 -37 -38 -38 -39 -39 -4 -4 -40 -40 -41 -41 -42 -42 -43 -43 -44 -44 -45 -45 -46 -46 -47 -47 -48 -48 -49 -49 -5 -5 -50 -50 -51 -51 -52 -52 -53 -53 -54 -54 -55 -55 -56 -56 -57 -57 -58 -58 -59 -59 -6 -6 -60 -60 -7 -7 -8 -8 -9 -9 +org.apache.spark.SparkException +{ + "errorClass" : "RECURSION_ROW_LIMIT_EXCEEDED", + "sqlState" : "42836", + "messageParameters" : { + "rowLimit" : "50" + } +} -- !query From 5770ff35ccf0a6c64ca9c7942f2dc6dd44114b85 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Fri, 12 Dec 2025 14:20:51 +0800 Subject: [PATCH 14/21] Update WithCTENormalized.scala --- .../spark/sql/catalyst/normalizer/WithCTENormalized.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala index 9e7675b649627..8fb9769a971cc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala @@ -42,7 +42,7 @@ object WithCTENormalized extends Rule[LogicalPlan]{ } def canonicalizeCTE(plan: LogicalPlan, defIdToNewId: Map[Long, Long]): LogicalPlan = { - plan.transformDownWithPruning( + plan.transformDownWithSubqueriesAndPruning( _.containsAnyPattern(CTE, PLAN_EXPRESSION)) { // For nested WithCTE, if defIndex didn't contain the cteId, // means it's not current WithCTE's ref. From 7243d1f2bde1333e04da61773b5c23529f6a7232 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Fri, 12 Dec 2025 18:45:01 +0800 Subject: [PATCH 15/21] Update WithCTENormalized.scala --- .../spark/sql/catalyst/normalizer/WithCTENormalized.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala index 8fb9769a971cc..59877a8e09971 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala @@ -42,8 +42,7 @@ object WithCTENormalized extends Rule[LogicalPlan]{ } def canonicalizeCTE(plan: LogicalPlan, defIdToNewId: Map[Long, Long]): LogicalPlan = { - plan.transformDownWithSubqueriesAndPruning( - _.containsAnyPattern(CTE, PLAN_EXPRESSION)) { + plan.transformDownWithSubqueries { // For nested WithCTE, if defIndex didn't contain the cteId, // means it's not current WithCTE's ref. case ref: CTERelationRef if defIdToNewId.contains(ref.cteId) => From 40a4332ce46384d411fad2e4bf22623ee19bb849 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Sun, 14 Dec 2025 13:56:05 +0800 Subject: [PATCH 16/21] Update WithCTENormalized.scala --- .../spark/sql/catalyst/normalizer/WithCTENormalized.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala index 59877a8e09971..cd2dcfb3732fc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.normalizer import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.logical.{CacheTableAsSelect, CTERelationRef, LogicalPlan, WithCTE} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.trees.TreePattern.{CTE, PLAN_EXPRESSION} +import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION object WithCTENormalized extends Rule[LogicalPlan]{ override def apply(plan: LogicalPlan): LogicalPlan = { From 95bcf5d1c6a89c0e676d808c6daba945f17619e8 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Tue, 16 Dec 2025 11:56:56 +0800 Subject: [PATCH 17/21] Update WithCTENormalized.scala --- .../spark/sql/catalyst/normalizer/WithCTENormalized.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala index cd2dcfb3732fc..aa219565d49d5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.normalizer import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.logical.{CacheTableAsSelect, CTERelationRef, LogicalPlan, WithCTE} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION object WithCTENormalized extends Rule[LogicalPlan]{ override def apply(plan: LogicalPlan): LogicalPlan = { @@ -48,7 +47,7 @@ object WithCTENormalized extends Rule[LogicalPlan]{ case ref: CTERelationRef if defIdToNewId.contains(ref.cteId) => ref.copy(cteId = defIdToNewId(ref.cteId)) case other => - other.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { + other.transformAllExpressionsWithSubqueries { case e: SubqueryExpression => e.withNewPlan(canonicalizeCTE(e.plan, defIdToNewId)) } } From 1adab5431b2603e4aae2e55d320d3999eb4b301e Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 17 Dec 2025 10:28:26 +0800 Subject: [PATCH 18/21] Update --- .../normalizer/WithCTENormalized.scala | 6 +- .../sql-tests/results/cte-recursion.sql.out | 270 ++++++++++++++++-- 2 files changed, 248 insertions(+), 28 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala index aa219565d49d5..bc03fab830906 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.normalizer import org.apache.spark.sql.catalyst.expressions.SubqueryExpression -import org.apache.spark.sql.catalyst.plans.logical.{CacheTableAsSelect, CTERelationRef, LogicalPlan, WithCTE} +import org.apache.spark.sql.catalyst.plans.logical.{CacheTableAsSelect, CTERelationRef, LogicalPlan, UnionLoop, UnionLoopRef, WithCTE} import org.apache.spark.sql.catalyst.rules.Rule object WithCTENormalized extends Rule[LogicalPlan]{ @@ -46,6 +46,10 @@ object WithCTENormalized extends Rule[LogicalPlan]{ // means it's not current WithCTE's ref. case ref: CTERelationRef if defIdToNewId.contains(ref.cteId) => ref.copy(cteId = defIdToNewId(ref.cteId)) + case unionLoop: UnionLoop if defIdToNewId.contains(unionLoop.id) => + unionLoop.copy(id = defIdToNewId(unionLoop.id)) + case unionLoopRef: UnionLoopRef if defIdToNewId.contains(unionLoopRef.loopId) => + unionLoopRef.copy(loopId = defIdToNewId(unionLoopRef.loopId)) case other => other.transformAllExpressionsWithSubqueries { case e: SubqueryExpression => e.withNewPlan(canonicalizeCTE(e.plan, defIdToNewId)) diff --git a/sql/core/src/test/resources/sql-tests/results/cte-recursion.sql.out b/sql/core/src/test/resources/sql-tests/results/cte-recursion.sql.out index 5ebee57141684..0a2cfb5c706e9 100644 --- a/sql/core/src/test/resources/sql-tests/results/cte-recursion.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/cte-recursion.sql.out @@ -341,16 +341,68 @@ WITH RECURSIVE t(n) MAX RECURSION LEVEL 100 AS ( ) SELECT * FROM t LIMIT ALL -- !query schema -struct<> +struct -- !query output -org.apache.spark.SparkException -{ - "errorClass" : "RECURSION_ROW_LIMIT_EXCEEDED", - "sqlState" : "42836", - "messageParameters" : { - "rowLimit" : "50" - } -} +1 +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +2 +20 +21 +22 +23 +24 +25 +26 +27 +28 +29 +3 +30 +31 +32 +33 +34 +35 +36 +37 +38 +39 +4 +40 +41 +42 +43 +44 +45 +46 +47 +48 +49 +5 +50 +51 +52 +53 +54 +55 +56 +57 +58 +59 +6 +60 +7 +8 +9 -- !query @@ -361,16 +413,68 @@ WITH RECURSIVE t MAX RECURSION LEVEL 100 AS ( ) SELECT * FROM t LIMIT ALL -- !query schema -struct<> +struct -- !query output -org.apache.spark.SparkException -{ - "errorClass" : "RECURSION_ROW_LIMIT_EXCEEDED", - "sqlState" : "42836", - "messageParameters" : { - "rowLimit" : "50" - } -} +1 +10 +11 +12 +13 +14 +15 +16 +17 +18 +19 +2 +20 +21 +22 +23 +24 +25 +26 +27 +28 +29 +3 +30 +31 +32 +33 +34 +35 +36 +37 +38 +39 +4 +40 +41 +42 +43 +44 +45 +46 +47 +48 +49 +5 +50 +51 +52 +53 +54 +55 +56 +57 +58 +59 +6 +60 +7 +8 +9 -- !query @@ -401,16 +505,128 @@ WITH RECURSIVE t MAX RECURSION LEVEL 100 AS ( ) (SELECT n FROM t LIMIT ALL) UNION ALL (SELECT n FROM t LIMIT ALL) -- !query schema -struct<> +struct -- !query output -org.apache.spark.SparkException -{ - "errorClass" : "RECURSION_ROW_LIMIT_EXCEEDED", - "sqlState" : "42836", - "messageParameters" : { - "rowLimit" : "50" - } -} +1 +1 +10 +10 +11 +11 +12 +12 +13 +13 +14 +14 +15 +15 +16 +16 +17 +17 +18 +18 +19 +19 +2 +2 +20 +20 +21 +21 +22 +22 +23 +23 +24 +24 +25 +25 +26 +26 +27 +27 +28 +28 +29 +29 +3 +3 +30 +30 +31 +31 +32 +32 +33 +33 +34 +34 +35 +35 +36 +36 +37 +37 +38 +38 +39 +39 +4 +4 +40 +40 +41 +41 +42 +42 +43 +43 +44 +44 +45 +45 +46 +46 +47 +47 +48 +48 +49 +49 +5 +5 +50 +50 +51 +51 +52 +52 +53 +53 +54 +54 +55 +55 +56 +56 +57 +57 +58 +58 +59 +59 +6 +6 +60 +60 +7 +7 +8 +8 +9 +9 -- !query From f99feb1f9dbe933fe493369a6c305e3f6f52ad86 Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Wed, 17 Dec 2025 10:53:34 +0800 Subject: [PATCH 19/21] Update SQLQuerySuite.scala --- .../org/apache/spark/sql/SQLQuerySuite.scala | 40 +++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 74cdee49e55a9..ade023f2b1cb1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -5088,6 +5088,46 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark checkAnswer(sql(query), Row(1, 2)) } } + + test("SPARK-46741: Cache Table with CTE should work") { + withTempView("t2") { + withTempView("t1") { + sql( + """ + |CREATE TEMPORARY VIEW t1 + |AS + |SELECT * FROM VALUES (0, 0), (1, 1), (2, 2) AS t(c1, c2); + |""".stripMargin) + sql( + """ + |CREATE TEMPORARY VIEW t2 AS + |WITH v as ( + | SELECT c1 + c1 c3 FROM t1 + |) + |SELECT SUM(c3) s FROM v; + |""".stripMargin) + sql( + """ + |CACHE TABLE cache_nested_cte_table + |WITH + |v AS ( + | SELECT c1 * c2 c3 from t1 + |) + |SELECT SUM(c3) FROM v + |EXCEPT + |SELECT s FROM t2; + |""".stripMargin) + + val df = sql("SELECT * FROM cache_nested_cte_table;") + + val inMemoryTableScan = collect(df.queryExecution.executedPlan) { + case i: InMemoryTableScanExec => i + } + assert(inMemoryTableScan.size == 1) + checkAnswer(df, Row(5) :: Nil) + } + } + } } case class Foo(bar: Option[String]) From de0a94d43c9f8a8d26e04f9c39cdca29388bb22a Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Thu, 18 Dec 2025 09:00:51 +0800 Subject: [PATCH 20/21] follow comment --- ...Normalized.scala => NormalizeCTEIds.scala} | 2 +- .../internal/BaseSessionStateBuilder.scala | 4 +- .../test/resources/sql-tests/inputs/cache.sql | 33 -- .../resources/sql-tests/results/cache.sql.out | 293 ------------------ .../apache/spark/sql/CachedTableSuite.scala | 38 +++ .../org/apache/spark/sql/SQLQuerySuite.scala | 40 --- 6 files changed, 41 insertions(+), 369 deletions(-) rename sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/{WithCTENormalized.scala => NormalizeCTEIds.scala} (98%) delete mode 100644 sql/core/src/test/resources/sql-tests/inputs/cache.sql delete mode 100644 sql/core/src/test/resources/sql-tests/results/cache.sql.out diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala similarity index 98% rename from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala rename to sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala index bc03fab830906..e3e0982d194bb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/WithCTENormalized.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.logical.{CacheTableAsSelect, CTERelationRef, LogicalPlan, UnionLoop, UnionLoopRef, WithCTE} import org.apache.spark.sql.catalyst.rules.Rule -object WithCTENormalized extends Rule[LogicalPlan]{ +object NormalizeCTEIds extends Rule[LogicalPlan]{ override def apply(plan: LogicalPlan): LogicalPlan = { val curId = new java.util.concurrent.atomic.AtomicLong() plan transformDown { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index f2cd4f3f36532..040733294423c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EvalSubqueriesForTimeTr import org.apache.spark.sql.catalyst.analysis.resolver.ResolverExtension import org.apache.spark.sql.catalyst.catalog.{FunctionExpressionBuilder, SessionCatalog} import org.apache.spark.sql.catalyst.expressions.{Expression, ExtractSemiStructuredFields} -import org.apache.spark.sql.catalyst.normalizer.WithCTENormalized +import org.apache.spark.sql.catalyst.normalizer.NormalizeCTEIds import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -404,7 +404,7 @@ abstract class BaseSessionStateBuilder( } protected def planNormalizationRules: Seq[Rule[LogicalPlan]] = { - WithCTENormalized +: + NormalizeCTEIds +: extensions.buildPlanNormalizationRules(session) } diff --git a/sql/core/src/test/resources/sql-tests/inputs/cache.sql b/sql/core/src/test/resources/sql-tests/inputs/cache.sql deleted file mode 100644 index b3588ce49e596..0000000000000 --- a/sql/core/src/test/resources/sql-tests/inputs/cache.sql +++ /dev/null @@ -1,33 +0,0 @@ -CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (0, 0), (1, 1), (2, 2) AS t(c1, c2); -CREATE TEMPORARY VIEW t2 AS -WITH v as ( - SELECT c1 + c1 c3 FROM t1 -) -SELECT SUM(c3) s FROM v; - -CACHE TABLE cache_table -WITH -t2 AS (SELECT 1) -SELECT * FROM t2; - -SELECT * FROM cache_table; - -EXPLAIN EXTENDED SELECT * FROM cache_table; - --- Nested WithCTE -CACHE TABLE cache_nested_cte_table -WITH -v AS ( - SELECT c1 * c2 c3 from t1 -) -SELECT SUM(c3) FROM v -EXCEPT -SELECT s FROM t2; - -SELECT * FROM cache_nested_cte_table; - -EXPLAIN EXTENDED SELECT * FROM cache_nested_cte_table; - --- Clean up -DROP TABLE IF EXISTS t1; -DROP TABLE IF EXISTS t2; diff --git a/sql/core/src/test/resources/sql-tests/results/cache.sql.out b/sql/core/src/test/resources/sql-tests/results/cache.sql.out deleted file mode 100644 index f7c3ec6d9f031..0000000000000 --- a/sql/core/src/test/resources/sql-tests/results/cache.sql.out +++ /dev/null @@ -1,293 +0,0 @@ --- Automatically generated by SQLQueryTestSuite --- !query -CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (0, 0), (1, 1), (2, 2) AS t(c1, c2) --- !query schema -struct<> --- !query output - - - --- !query -CREATE TEMPORARY VIEW t2 AS -WITH v as ( - SELECT c1 + c1 c3 FROM t1 -) -SELECT SUM(c3) s FROM v --- !query schema -struct<> --- !query output - - - --- !query -CACHE TABLE cache_table -WITH -t2 AS (SELECT 1) -SELECT * FROM t2 --- !query schema -struct<> --- !query output - - - --- !query -SELECT * FROM cache_table --- !query schema -struct<1:int> --- !query output -1 - - --- !query -EXPLAIN EXTENDED SELECT * FROM cache_table --- !query schema -struct --- !query output -== Parsed Logical Plan == -'Project [*] -+- 'UnresolvedRelation [cache_table], [], false - -== Analyzed Logical Plan == -1: int -Project [1#x] -+- SubqueryAlias cache_table - +- View (`cache_table`, [1#x]) - +- Project [cast(1#x as int) AS 1#x] - +- WithCTE - :- CTERelationDef xxxx, false - : +- SubqueryAlias t2 - : +- Project [1 AS 1#x] - : +- OneRowRelation - +- Project [1#x] - +- SubqueryAlias t2 - +- CTERelationRef xxxx, true, [1#x], false, false, 1 - -== Optimized Logical Plan == -InMemoryRelation [1#x], StorageLevel(disk, memory, deserialized, 1 replicas) - +- *Project [1 AS 1#x] - +- *Scan OneRowRelation[] - -== Physical Plan == -Scan In-memory table cache_table [1#x] - +- InMemoryRelation [1#x], StorageLevel(disk, memory, deserialized, 1 replicas) - +- *Project [1 AS 1#x] - +- *Scan OneRowRelation[] - - --- !query -CACHE TABLE cache_nested_cte_table -WITH -v AS ( - SELECT c1 * c2 c3 from t1 -) -SELECT SUM(c3) FROM v -EXCEPT -SELECT s FROM t2 --- !query schema -struct<> --- !query output - - - --- !query -SELECT * FROM cache_nested_cte_table --- !query schema -struct --- !query output -5 - - --- !query -EXPLAIN EXTENDED SELECT * FROM cache_nested_cte_table --- !query schema -struct --- !query output -== Parsed Logical Plan == -'Project [*] -+- 'UnresolvedRelation [cache_nested_cte_table], [], false - -== Analyzed Logical Plan == -sum(c3): bigint -Project [sum(c3)#xL] -+- SubqueryAlias cache_nested_cte_table - +- View (`cache_nested_cte_table`, [sum(c3)#xL]) - +- Project [cast(sum(c3)#xL as bigint) AS sum(c3)#xL] - +- WithCTE - :- CTERelationDef xxxx, false - : +- SubqueryAlias v - : +- Project [(c1#x * c2#x) AS c3#x] - : +- SubqueryAlias t1 - : +- View (`t1`, [c1#x, c2#x]) - : +- Project [cast(c1#x as int) AS c1#x, cast(c2#x as int) AS c2#x] - : +- Project [c1#x, c2#x] - : +- SubqueryAlias t - : +- LocalRelation [c1#x, c2#x] - +- Except false - :- Aggregate [sum(c3#x) AS sum(c3)#xL] - : +- SubqueryAlias v - : +- CTERelationRef xxxx, true, [c3#x], false, false - +- Project [s#xL] - +- SubqueryAlias t2 - +- View (`t2`, [s#xL]) - +- Project [cast(s#xL as bigint) AS s#xL] - +- WithCTE - :- CTERelationDef xxxx, false - : +- SubqueryAlias v - : +- Project [(c1#x + c1#x) AS c3#x] - : +- SubqueryAlias t1 - : +- View (`t1`, [c1#x, c2#x]) - : +- Project [cast(c1#x as int) AS c1#x, cast(c2#x as int) AS c2#x] - : +- Project [c1#x, c2#x] - : +- SubqueryAlias t - : +- LocalRelation [c1#x, c2#x] - +- Aggregate [sum(c3#x) AS s#xL] - +- SubqueryAlias v - +- CTERelationRef xxxx, true, [c3#x], false, false - -== Optimized Logical Plan == -InMemoryRelation [sum(c3)#xL], StorageLevel(disk, memory, deserialized, 1 replicas) - +- AdaptiveSparkPlan isFinalPlan=true - +- == Final Plan == - ResultQueryStage 1 - +- *BroadcastHashJoin [coalesce(sum(c3)#xL, 0), isnull(sum(c3)#xL)], [coalesce(s#xL, 0), isnull(s#xL)], LeftAnti, BuildRight, false - :- *Project [ReusedSubquery Subquery subquery#x, [id=#x].sum(c3) AS sum(c3)#xL] - : : +- ReusedSubquery Subquery subquery#x, [id=#x] - : +- *Scan OneRowRelation[] - +- BroadcastQueryStage 0 - +- BroadcastExchange HashedRelationBroadcastMode(List(coalesce(input[0, bigint, true], 0), isnull(input[0, bigint, true])),false), [plan_id=x] - +- *Project [Subquery subquery#x, [id=#x].s AS s#xL] - : +- Subquery subquery#x, [id=#x] - : +- AdaptiveSparkPlan isFinalPlan=true - +- == Final Plan == - ResultQueryStage 1 - +- *Project [named_struct(sum(c3), sum(c3)#xL, s, s#xL) AS mergedValue#x] - +- *HashAggregate(keys=[], functions=[sum(c3#x), sum(c3#x)], output=[sum(c3)#xL, s#xL]) - +- ShuffleQueryStage 0 - +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] - +- *HashAggregate(keys=[], functions=[partial_sum(c3#x), partial_sum(c3#x)], output=[sum#xL, sum#xL]) - +- *Project [(c1#x * c2#x) AS c3#x, (c1#x + c1#x) AS c3#x] - +- *LocalTableScan [c1#x, c2#x] - +- == Initial Plan == - Project [named_struct(sum(c3), sum(c3)#xL, s, s#xL) AS mergedValue#x] - +- HashAggregate(keys=[], functions=[sum(c3#x), sum(c3#x)], output=[sum(c3)#xL, s#xL]) - +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] - +- HashAggregate(keys=[], functions=[partial_sum(c3#x), partial_sum(c3#x)], output=[sum#xL, sum#xL]) - +- Project [(c1#x * c2#x) AS c3#x, (c1#x + c1#x) AS c3#x] - +- LocalTableScan [c1#x, c2#x] - +- *Scan OneRowRelation[] - +- == Initial Plan == - BroadcastHashJoin [coalesce(sum(c3)#xL, 0), isnull(sum(c3)#xL)], [coalesce(s#xL, 0), isnull(s#xL)], LeftAnti, BuildRight, false - :- Project [Subquery subquery#x, [id=#x].sum(c3) AS sum(c3)#xL] - : : +- Subquery subquery#x, [id=#x] - : : +- AdaptiveSparkPlan isFinalPlan=false - : : +- Project [named_struct(sum(c3), sum(c3)#xL, s, s#xL) AS mergedValue#x] - : : +- HashAggregate(keys=[], functions=[sum(c3#x), sum(c3#x)], output=[sum(c3)#xL, s#xL]) - : : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] - : : +- HashAggregate(keys=[], functions=[partial_sum(c3#x), partial_sum(c3#x)], output=[sum#xL, sum#xL]) - : : +- Project [(c1#x * c2#x) AS c3#x, (c1#x + c1#x) AS c3#x] - : : +- LocalTableScan [c1#x, c2#x] - : +- Scan OneRowRelation[] - +- BroadcastExchange HashedRelationBroadcastMode(List(coalesce(input[0, bigint, true], 0), isnull(input[0, bigint, true])),false), [plan_id=x] - +- Project [Subquery subquery#x, [id=#x].s AS s#xL] - : +- Subquery subquery#x, [id=#x] - : +- AdaptiveSparkPlan isFinalPlan=true - +- == Final Plan == - ResultQueryStage 1 - +- *Project [named_struct(sum(c3), sum(c3)#xL, s, s#xL) AS mergedValue#x] - +- *HashAggregate(keys=[], functions=[sum(c3#x), sum(c3#x)], output=[sum(c3)#xL, s#xL]) - +- ShuffleQueryStage 0 - +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] - +- *HashAggregate(keys=[], functions=[partial_sum(c3#x), partial_sum(c3#x)], output=[sum#xL, sum#xL]) - +- *Project [(c1#x * c2#x) AS c3#x, (c1#x + c1#x) AS c3#x] - +- *LocalTableScan [c1#x, c2#x] - +- == Initial Plan == - Project [named_struct(sum(c3), sum(c3)#xL, s, s#xL) AS mergedValue#x] - +- HashAggregate(keys=[], functions=[sum(c3#x), sum(c3#x)], output=[sum(c3)#xL, s#xL]) - +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] - +- HashAggregate(keys=[], functions=[partial_sum(c3#x), partial_sum(c3#x)], output=[sum#xL, sum#xL]) - +- Project [(c1#x * c2#x) AS c3#x, (c1#x + c1#x) AS c3#x] - +- LocalTableScan [c1#x, c2#x] - +- Scan OneRowRelation[] - -== Physical Plan == -AdaptiveSparkPlan isFinalPlan=false -+- Scan In-memory table cache_nested_cte_table [sum(c3)#xL] - +- InMemoryRelation [sum(c3)#xL], StorageLevel(disk, memory, deserialized, 1 replicas) - +- AdaptiveSparkPlan isFinalPlan=true - +- == Final Plan == - ResultQueryStage 1 - +- *BroadcastHashJoin [coalesce(sum(c3)#xL, 0), isnull(sum(c3)#xL)], [coalesce(s#xL, 0), isnull(s#xL)], LeftAnti, BuildRight, false - :- *Project [ReusedSubquery Subquery subquery#x, [id=#x].sum(c3) AS sum(c3)#xL] - : : +- ReusedSubquery Subquery subquery#x, [id=#x] - : +- *Scan OneRowRelation[] - +- BroadcastQueryStage 0 - +- BroadcastExchange HashedRelationBroadcastMode(List(coalesce(input[0, bigint, true], 0), isnull(input[0, bigint, true])),false), [plan_id=x] - +- *Project [Subquery subquery#x, [id=#x].s AS s#xL] - : +- Subquery subquery#x, [id=#x] - : +- AdaptiveSparkPlan isFinalPlan=true - +- == Final Plan == - ResultQueryStage 1 - +- *Project [named_struct(sum(c3), sum(c3)#xL, s, s#xL) AS mergedValue#x] - +- *HashAggregate(keys=[], functions=[sum(c3#x), sum(c3#x)], output=[sum(c3)#xL, s#xL]) - +- ShuffleQueryStage 0 - +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] - +- *HashAggregate(keys=[], functions=[partial_sum(c3#x), partial_sum(c3#x)], output=[sum#xL, sum#xL]) - +- *Project [(c1#x * c2#x) AS c3#x, (c1#x + c1#x) AS c3#x] - +- *LocalTableScan [c1#x, c2#x] - +- == Initial Plan == - Project [named_struct(sum(c3), sum(c3)#xL, s, s#xL) AS mergedValue#x] - +- HashAggregate(keys=[], functions=[sum(c3#x), sum(c3#x)], output=[sum(c3)#xL, s#xL]) - +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] - +- HashAggregate(keys=[], functions=[partial_sum(c3#x), partial_sum(c3#x)], output=[sum#xL, sum#xL]) - +- Project [(c1#x * c2#x) AS c3#x, (c1#x + c1#x) AS c3#x] - +- LocalTableScan [c1#x, c2#x] - +- *Scan OneRowRelation[] - +- == Initial Plan == - BroadcastHashJoin [coalesce(sum(c3)#xL, 0), isnull(sum(c3)#xL)], [coalesce(s#xL, 0), isnull(s#xL)], LeftAnti, BuildRight, false - :- Project [Subquery subquery#x, [id=#x].sum(c3) AS sum(c3)#xL] - : : +- Subquery subquery#x, [id=#x] - : : +- AdaptiveSparkPlan isFinalPlan=false - : : +- Project [named_struct(sum(c3), sum(c3)#xL, s, s#xL) AS mergedValue#x] - : : +- HashAggregate(keys=[], functions=[sum(c3#x), sum(c3#x)], output=[sum(c3)#xL, s#xL]) - : : +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] - : : +- HashAggregate(keys=[], functions=[partial_sum(c3#x), partial_sum(c3#x)], output=[sum#xL, sum#xL]) - : : +- Project [(c1#x * c2#x) AS c3#x, (c1#x + c1#x) AS c3#x] - : : +- LocalTableScan [c1#x, c2#x] - : +- Scan OneRowRelation[] - +- BroadcastExchange HashedRelationBroadcastMode(List(coalesce(input[0, bigint, true], 0), isnull(input[0, bigint, true])),false), [plan_id=x] - +- Project [Subquery subquery#x, [id=#x].s AS s#xL] - : +- Subquery subquery#x, [id=#x] - : +- AdaptiveSparkPlan isFinalPlan=true - +- == Final Plan == - ResultQueryStage 1 - +- *Project [named_struct(sum(c3), sum(c3)#xL, s, s#xL) AS mergedValue#x] - +- *HashAggregate(keys=[], functions=[sum(c3#x), sum(c3#x)], output=[sum(c3)#xL, s#xL]) - +- ShuffleQueryStage 0 - +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] - +- *HashAggregate(keys=[], functions=[partial_sum(c3#x), partial_sum(c3#x)], output=[sum#xL, sum#xL]) - +- *Project [(c1#x * c2#x) AS c3#x, (c1#x + c1#x) AS c3#x] - +- *LocalTableScan [c1#x, c2#x] - +- == Initial Plan == - Project [named_struct(sum(c3), sum(c3)#xL, s, s#xL) AS mergedValue#x] - +- HashAggregate(keys=[], functions=[sum(c3#x), sum(c3#x)], output=[sum(c3)#xL, s#xL]) - +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=x] - +- HashAggregate(keys=[], functions=[partial_sum(c3#x), partial_sum(c3#x)], output=[sum#xL, sum#xL]) - +- Project [(c1#x * c2#x) AS c3#x, (c1#x + c1#x) AS c3#x] - +- LocalTableScan [c1#x, c2#x] - +- Scan OneRowRelation[] - - --- !query -DROP TABLE IF EXISTS t1 --- !query schema -struct<> --- !query output - - - --- !query -DROP TABLE IF EXISTS t2 --- !query schema -struct<> --- !query output - diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 12d26c4e195f1..880d8d72c73e7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -2598,6 +2598,44 @@ class CachedTableSuite extends QueryTest with SQLTestUtils } } + test("SPARK-46741: Cache Table with CTE should work") { + withTempView("t1", "t2") { + sql( + """ + |CREATE TEMPORARY VIEW t1 + |AS + |SELECT * FROM VALUES (0, 0), (1, 1), (2, 2) AS t(c1, c2) + |""".stripMargin) + sql( + """ + |CREATE TEMPORARY VIEW t2 AS + |WITH v as ( + | SELECT c1 + c1 c3 FROM t1 + |) + |SELECT SUM(c3) s FROM v + |""".stripMargin) + sql( + """ + |CACHE TABLE cache_nested_cte_table + |WITH + |v AS ( + | SELECT c1 * c2 c3 from t1 + |) + |SELECT SUM(c3) FROM v + |EXCEPT + |SELECT s FROM t2 + |""".stripMargin) + + val df = sql("SELECT * FROM cache_nested_cte_table") + + val inMemoryTableScan = collect(df.queryExecution.executedPlan) { + case i: InMemoryTableScanExec => i + } + assert(inMemoryTableScan.size == 1) + checkAnswer(df, Row(5) :: Nil) + } + } + private def cacheManager = spark.sharedState.cacheManager private def pinTable( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index ade023f2b1cb1..74cdee49e55a9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -5088,46 +5088,6 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark checkAnswer(sql(query), Row(1, 2)) } } - - test("SPARK-46741: Cache Table with CTE should work") { - withTempView("t2") { - withTempView("t1") { - sql( - """ - |CREATE TEMPORARY VIEW t1 - |AS - |SELECT * FROM VALUES (0, 0), (1, 1), (2, 2) AS t(c1, c2); - |""".stripMargin) - sql( - """ - |CREATE TEMPORARY VIEW t2 AS - |WITH v as ( - | SELECT c1 + c1 c3 FROM t1 - |) - |SELECT SUM(c3) s FROM v; - |""".stripMargin) - sql( - """ - |CACHE TABLE cache_nested_cte_table - |WITH - |v AS ( - | SELECT c1 * c2 c3 from t1 - |) - |SELECT SUM(c3) FROM v - |EXCEPT - |SELECT s FROM t2; - |""".stripMargin) - - val df = sql("SELECT * FROM cache_nested_cte_table;") - - val inMemoryTableScan = collect(df.queryExecution.executedPlan) { - case i: InMemoryTableScanExec => i - } - assert(inMemoryTableScan.size == 1) - checkAnswer(df, Row(5) :: Nil) - } - } - } } case class Foo(bar: Option[String]) From d10fd2b075508b692c692e0de027aa5176cad97c Mon Sep 17 00:00:00 2001 From: Angerszhuuuu Date: Thu, 18 Dec 2025 09:29:01 +0800 Subject: [PATCH 21/21] update --- .../catalyst/normalizer/NormalizeCTEIds.scala | 5 - .../sql-tests/analyzer-results/cache.sql.out | 184 ------------------ 2 files changed, 189 deletions(-) delete mode 100644 sql/core/src/test/resources/sql-tests/analyzer-results/cache.sql.out diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala index e3e0982d194bb..1b1b526e78140 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/normalizer/NormalizeCTEIds.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.catalyst.normalizer -import org.apache.spark.sql.catalyst.expressions.SubqueryExpression import org.apache.spark.sql.catalyst.plans.logical.{CacheTableAsSelect, CTERelationRef, LogicalPlan, UnionLoop, UnionLoopRef, WithCTE} import org.apache.spark.sql.catalyst.rules.Rule @@ -50,10 +49,6 @@ object NormalizeCTEIds extends Rule[LogicalPlan]{ unionLoop.copy(id = defIdToNewId(unionLoop.id)) case unionLoopRef: UnionLoopRef if defIdToNewId.contains(unionLoopRef.loopId) => unionLoopRef.copy(loopId = defIdToNewId(unionLoopRef.loopId)) - case other => - other.transformAllExpressionsWithSubqueries { - case e: SubqueryExpression => e.withNewPlan(canonicalizeCTE(e.plan, defIdToNewId)) - } } } } diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/cache.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/cache.sql.out deleted file mode 100644 index e93da8b86b890..0000000000000 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/cache.sql.out +++ /dev/null @@ -1,184 +0,0 @@ --- Automatically generated by SQLQueryTestSuite --- !query -CREATE TEMPORARY VIEW t1 AS SELECT * FROM VALUES (0, 0), (1, 1), (2, 2) AS t(c1, c2) --- !query analysis -CreateViewCommand `t1`, SELECT * FROM VALUES (0, 0), (1, 1), (2, 2) AS t(c1, c2), false, false, LocalTempView, UNSUPPORTED, true - +- Project [c1#x, c2#x] - +- SubqueryAlias t - +- LocalRelation [c1#x, c2#x] - - --- !query -CREATE TEMPORARY VIEW t2 AS -WITH v as ( - SELECT c1 + c1 c3 FROM t1 -) -SELECT SUM(c3) s FROM v --- !query analysis -CreateViewCommand `t2`, WITH v as ( - SELECT c1 + c1 c3 FROM t1 -) -SELECT SUM(c3) s FROM v, false, false, LocalTempView, UNSUPPORTED, true - +- WithCTE - :- CTERelationDef xxxx, false - : +- SubqueryAlias v - : +- Project [(c1#x + c1#x) AS c3#x] - : +- SubqueryAlias t1 - : +- View (`t1`, [c1#x, c2#x]) - : +- Project [cast(c1#x as int) AS c1#x, cast(c2#x as int) AS c2#x] - : +- Project [c1#x, c2#x] - : +- SubqueryAlias t - : +- LocalRelation [c1#x, c2#x] - +- Aggregate [sum(c3#x) AS s#xL] - +- SubqueryAlias v - +- CTERelationRef xxxx, true, [c3#x], false, false - - --- !query -CACHE TABLE cache_table -WITH -t2 AS (SELECT 1) -SELECT * FROM t2 --- !query analysis -CacheTableAsSelect cache_table, WITH -t2 AS (SELECT 1) -SELECT * FROM t2, false, true - +- WithCTE - :- CTERelationDef xxxx, false - : +- SubqueryAlias t2 - : +- Project [1 AS 1#x] - : +- OneRowRelation - +- Project [1#x] - +- SubqueryAlias t2 - +- CTERelationRef xxxx, true, [1#x], false, false, 1 - - --- !query -SELECT * FROM cache_table --- !query analysis -Project [1#x] -+- SubqueryAlias cache_table - +- View (`cache_table`, [1#x]) - +- Project [cast(1#x as int) AS 1#x] - +- WithCTE - :- CTERelationDef xxxx, false - : +- SubqueryAlias t2 - : +- Project [1 AS 1#x] - : +- OneRowRelation - +- Project [1#x] - +- SubqueryAlias t2 - +- CTERelationRef xxxx, true, [1#x], false, false, 1 - - --- !query -EXPLAIN EXTENDED SELECT * FROM cache_table --- !query analysis -ExplainCommand 'Project [*], ExtendedMode - - --- !query -CACHE TABLE cache_nested_cte_table -WITH -v AS ( - SELECT c1 * c2 c3 from t1 -) -SELECT SUM(c3) FROM v -EXCEPT -SELECT s FROM t2 --- !query analysis -CacheTableAsSelect cache_nested_cte_table, WITH -v AS ( - SELECT c1 * c2 c3 from t1 -) -SELECT SUM(c3) FROM v -EXCEPT -SELECT s FROM t2, false, true - +- WithCTE - :- CTERelationDef xxxx, false - : +- SubqueryAlias v - : +- Project [(c1#x * c2#x) AS c3#x] - : +- SubqueryAlias t1 - : +- View (`t1`, [c1#x, c2#x]) - : +- Project [cast(c1#x as int) AS c1#x, cast(c2#x as int) AS c2#x] - : +- Project [c1#x, c2#x] - : +- SubqueryAlias t - : +- LocalRelation [c1#x, c2#x] - +- Except false - :- Aggregate [sum(c3#x) AS sum(c3)#xL] - : +- SubqueryAlias v - : +- CTERelationRef xxxx, true, [c3#x], false, false - +- Project [s#xL] - +- SubqueryAlias t2 - +- View (`t2`, [s#xL]) - +- Project [cast(s#xL as bigint) AS s#xL] - +- WithCTE - :- CTERelationDef xxxx, false - : +- SubqueryAlias v - : +- Project [(c1#x + c1#x) AS c3#x] - : +- SubqueryAlias t1 - : +- View (`t1`, [c1#x, c2#x]) - : +- Project [cast(c1#x as int) AS c1#x, cast(c2#x as int) AS c2#x] - : +- Project [c1#x, c2#x] - : +- SubqueryAlias t - : +- LocalRelation [c1#x, c2#x] - +- Aggregate [sum(c3#x) AS s#xL] - +- SubqueryAlias v - +- CTERelationRef xxxx, true, [c3#x], false, false - - --- !query -SELECT * FROM cache_nested_cte_table --- !query analysis -Project [sum(c3)#xL] -+- SubqueryAlias cache_nested_cte_table - +- View (`cache_nested_cte_table`, [sum(c3)#xL]) - +- Project [cast(sum(c3)#xL as bigint) AS sum(c3)#xL] - +- WithCTE - :- CTERelationDef xxxx, false - : +- SubqueryAlias v - : +- Project [(c1#x * c2#x) AS c3#x] - : +- SubqueryAlias t1 - : +- View (`t1`, [c1#x, c2#x]) - : +- Project [cast(c1#x as int) AS c1#x, cast(c2#x as int) AS c2#x] - : +- Project [c1#x, c2#x] - : +- SubqueryAlias t - : +- LocalRelation [c1#x, c2#x] - +- Except false - :- Aggregate [sum(c3#x) AS sum(c3)#xL] - : +- SubqueryAlias v - : +- CTERelationRef xxxx, true, [c3#x], false, false - +- Project [s#xL] - +- SubqueryAlias t2 - +- View (`t2`, [s#xL]) - +- Project [cast(s#xL as bigint) AS s#xL] - +- WithCTE - :- CTERelationDef xxxx, false - : +- SubqueryAlias v - : +- Project [(c1#x + c1#x) AS c3#x] - : +- SubqueryAlias t1 - : +- View (`t1`, [c1#x, c2#x]) - : +- Project [cast(c1#x as int) AS c1#x, cast(c2#x as int) AS c2#x] - : +- Project [c1#x, c2#x] - : +- SubqueryAlias t - : +- LocalRelation [c1#x, c2#x] - +- Aggregate [sum(c3#x) AS s#xL] - +- SubqueryAlias v - +- CTERelationRef xxxx, true, [c3#x], false, false - - --- !query -EXPLAIN EXTENDED SELECT * FROM cache_nested_cte_table --- !query analysis -ExplainCommand 'Project [*], ExtendedMode - - --- !query -DROP TABLE IF EXISTS t1 --- !query analysis -DropTempViewCommand t1 - - --- !query -DROP TABLE IF EXISTS t2 --- !query analysis -DropTempViewCommand t2