From af0fea91542b0ab7d1503f8bb42cfe02d6ef4d6b Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Tue, 2 Sep 2025 22:35:56 -0700 Subject: [PATCH 01/10] [SPARK-53469] Abiliy to cleanup shuffle in Thrift server --- .../scala/org/apache/spark/sql/classic/SparkSession.scala | 3 ++- .../apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala | 5 ++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala index 5811fe759d3e5..b2a0fd0d69bc9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala @@ -583,7 +583,8 @@ class SparkSession private( } queryPlan } - Dataset.ofRows(self, plan, tracker) + Dataset.ofRows(self, plan, tracker, + QueryExecution.determineShuffleCleanupMode(sessionState.conf)) } /** @inheritdoc */ diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index 8b9b7352fdca2..3c904b6e3a349 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -30,8 +30,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.LogKeys.COMMAND import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.CommandResult -import org.apache.spark.sql.classic.ClassicConversions._ -import org.apache.spark.sql.execution.{QueryExecution, QueryExecutionException, SQLExecution} + import org.apache.spark.sql.execution.{QueryExecution, QueryExecutionException, SQLExecution} import org.apache.spark.sql.execution.HiveResult.hiveResultString import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} import org.apache.spark.util.Utils @@ -67,7 +66,7 @@ private[hive] class SparkSQLDriver(val sparkSession: SparkSession = SparkSQLEnv. new VariableSubstitution().substitute(command) } sparkSession.sparkContext.setJobDescription(substitutorCommand) - val execution = sparkSession.sessionState.executePlan(sparkSession.sql(command).logicalPlan) + val execution = sparkSession.sql(command).queryExecution // The SQL command has been executed above via `executePlan`, therefore we don't need to // wrap it again with a new execution ID when getting Hive result. execution.logical match { From 5d5be902b99f9b4c8702d7d0e1eca927f8195429 Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Thu, 4 Sep 2025 18:56:57 -0700 Subject: [PATCH 02/10] Fix test --- .../adaptive/AdaptiveQueryExecSuite.scala | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 3e7d26f74bd46..2d4b176d9d0f1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -69,6 +69,16 @@ class AdaptiveQueryExecSuite setupTestData() + protected override def beforeAll(): Unit = { + super.beforeAll() + sqlConf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false) + } + + protected override def afterAll(): Unit = { + sqlConf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, true) + super.afterAll() + } + private def runAdaptiveAndVerifyResult(query: String, skipCheckAnswer: Boolean = false): (SparkPlan, SparkPlan) = { var finalPlanCnt = 0 @@ -2036,9 +2046,7 @@ class AdaptiveQueryExecSuite } withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", - SQLConf.SHUFFLE_PARTITIONS.key -> "5", - // Disabling cleanup as the test assertions depend on them - SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key -> "false") { + SQLConf.SHUFFLE_PARTITIONS.key -> "5") { val df = sql( """ |SELECT * FROM ( From d292715c739cd7af548f391bf0b6bb98eae6c068 Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Fri, 5 Sep 2025 18:41:24 -0700 Subject: [PATCH 03/10] Fix test: IjectRuntimeFilterSuite --- .../scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala index 603ec183bfb61..d7b0227fcc08f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEProp import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.util.Utils class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSparkSession with AdaptiveSparkPlanHelper { @@ -205,6 +206,7 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp sql("analyze table bf5part compute statistics for columns a5, b5, c5, d5, e5, f5") sql("analyze table bf5filtered compute statistics for columns a5, b5, c5, d5, e5, f5") + conf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false) // `MergeScalarSubqueries` can duplicate subqueries in the optimized plan and would make testing // complicated. conf.setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, MergeSubplans.ruleName) @@ -213,6 +215,7 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp protected override def afterAll(): Unit = try { conf.setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, SQLConf.OPTIMIZER_EXCLUDED_RULES.defaultValueString) + conf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, Utils.isTesting) sql("DROP TABLE IF EXISTS bf1") sql("DROP TABLE IF EXISTS bf2") From 038f3cd65dde8c2a85022ff647abf0f12704f09e Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Sat, 20 Sep 2025 21:16:08 -0700 Subject: [PATCH 04/10] Address review comment: add code comment --- .../scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala | 2 ++ .../spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 2 ++ 2 files changed, 4 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala index d7b0227fcc08f..33868f4bc10ed 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala @@ -206,6 +206,8 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp sql("analyze table bf5part compute statistics for columns a5, b5, c5, d5, e5, f5") sql("analyze table bf5filtered compute statistics for columns a5, b5, c5, d5, e5, f5") + // Tests depend on intermediate results that would otherwise be cleaned up when + // shuffle clean up is enabled, causing test failures. conf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false) // `MergeScalarSubqueries` can duplicate subqueries in the optimized plan and would make testing // complicated. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 2d4b176d9d0f1..73817fa51e0b1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -71,6 +71,8 @@ class AdaptiveQueryExecSuite protected override def beforeAll(): Unit = { super.beforeAll() + // Tests depend on intermediate results that would otherwise be cleaned up when + // shuffle clean up is enabled, causing test failures. sqlConf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false) } From 600793d99aa3665bb0a6683f33f03acb5a3a9447 Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Mon, 29 Sep 2025 15:27:43 -0700 Subject: [PATCH 05/10] Workaround to make test pass, correct fix is to fix adaptive execution --- .../apache/spark/sql/InjectRuntimeFilterSuite.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala index 33868f4bc10ed..861fe5a369778 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEProp import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils} import org.apache.spark.sql.types.{IntegerType, StructType} -import org.apache.spark.util.Utils class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSparkSession with AdaptiveSparkPlanHelper { @@ -49,6 +48,9 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp Seq(4, 0, 86, null, 96, 14), Seq(28, 16, 58, null, null, null), Seq(1, 88, null, 8, null, 79), + Seq(5, 88, 62, 8, null, 79), + Seq(62, 88, 40, 8, null, 79), + // Seq(67, 88, 73, 8, null, 79), Seq(59, null, null, null, 20, 25), Seq(1, 50, null, 94, 94, null), Seq(null, null, null, 67, 51, 57), @@ -93,6 +95,8 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp Seq(53, null, 6, 68, 28, 13), Seq(null, null, null, null, 89, 23), Seq(36, 73, 40, null, 8, null), + Seq(62, 40, 40, null, 8, 100), + Seq(5, 73, 40, null, 8, null), Seq(24, null, null, 40, null, null)) val rdd2 = spark.sparkContext.parallelize(data2) val rddRow2 = rdd2.map(s => Row.fromSeq(s)) @@ -189,7 +193,7 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp Seq(75, null, 15, null, 81, null), Seq(53, null, 6, 68, 28, 13), Seq(null, null, null, null, 89, 23), - Seq(36, 73, 40, null, 8, null), + Seq(36, 73, 40, null, 8, 100), Seq(24, null, null, 40, null, null)) val rdd5part = spark.sparkContext.parallelize(data5part) val rddRow5part = rdd5part.map(s => Row.fromSeq(s)) @@ -206,9 +210,6 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp sql("analyze table bf5part compute statistics for columns a5, b5, c5, d5, e5, f5") sql("analyze table bf5filtered compute statistics for columns a5, b5, c5, d5, e5, f5") - // Tests depend on intermediate results that would otherwise be cleaned up when - // shuffle clean up is enabled, causing test failures. - conf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false) // `MergeScalarSubqueries` can duplicate subqueries in the optimized plan and would make testing // complicated. conf.setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, MergeSubplans.ruleName) @@ -217,7 +218,6 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp protected override def afterAll(): Unit = try { conf.setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, SQLConf.OPTIMIZER_EXCLUDED_RULES.defaultValueString) - conf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, Utils.isTesting) sql("DROP TABLE IF EXISTS bf1") sql("DROP TABLE IF EXISTS bf2") From 41ccfde6bb287ad36cb1034cad9e3332d52d6afd Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Tue, 14 Oct 2025 11:41:24 -0700 Subject: [PATCH 06/10] Revert testdata changes --- .../org/apache/spark/sql/InjectRuntimeFilterSuite.scala | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala index 861fe5a369778..603ec183bfb61 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/InjectRuntimeFilterSuite.scala @@ -48,9 +48,6 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp Seq(4, 0, 86, null, 96, 14), Seq(28, 16, 58, null, null, null), Seq(1, 88, null, 8, null, 79), - Seq(5, 88, 62, 8, null, 79), - Seq(62, 88, 40, 8, null, 79), - // Seq(67, 88, 73, 8, null, 79), Seq(59, null, null, null, 20, 25), Seq(1, 50, null, 94, 94, null), Seq(null, null, null, 67, 51, 57), @@ -95,8 +92,6 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp Seq(53, null, 6, 68, 28, 13), Seq(null, null, null, null, 89, 23), Seq(36, 73, 40, null, 8, null), - Seq(62, 40, 40, null, 8, 100), - Seq(5, 73, 40, null, 8, null), Seq(24, null, null, 40, null, null)) val rdd2 = spark.sparkContext.parallelize(data2) val rddRow2 = rdd2.map(s => Row.fromSeq(s)) @@ -193,7 +188,7 @@ class InjectRuntimeFilterSuite extends QueryTest with SQLTestUtils with SharedSp Seq(75, null, 15, null, 81, null), Seq(53, null, 6, 68, 28, 13), Seq(null, null, null, null, 89, 23), - Seq(36, 73, 40, null, 8, 100), + Seq(36, 73, 40, null, 8, null), Seq(24, null, null, 40, null, null)) val rdd5part = spark.sparkContext.parallelize(data5part) val rddRow5part = rdd5part.map(s => Row.fromSeq(s)) From 3733b978260055a516cc7c37d62ebe36cbce03cb Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Tue, 18 Nov 2025 23:02:21 -0800 Subject: [PATCH 07/10] Address review comment --- .../org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index 3c904b6e3a349..02c47a88af074 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.internal.LogKeys.COMMAND import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.CommandResult - import org.apache.spark.sql.execution.{QueryExecution, QueryExecutionException, SQLExecution} +import org.apache.spark.sql.execution.{QueryExecution, QueryExecutionException, SQLExecution} import org.apache.spark.sql.execution.HiveResult.hiveResultString import org.apache.spark.sql.internal.{SQLConf, VariableSubstitution} import org.apache.spark.util.Utils From 0d6d35b969ebef1b1e84ceaf1a4cded8db44a216 Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Wed, 19 Nov 2025 10:35:28 -0800 Subject: [PATCH 08/10] Address review comments: Disable shuffle cleanup in spark conf --- .../adaptive/AdaptiveQueryExecSuite.scala | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 73817fa51e0b1..144a6f59f48c3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -23,7 +23,7 @@ import java.net.URI import org.apache.logging.log4j.Level import org.scalatest.PrivateMethodTester -import org.apache.spark.SparkException +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} import org.apache.spark.shuffle.sort.SortShuffleManager @@ -69,17 +69,8 @@ class AdaptiveQueryExecSuite setupTestData() - protected override def beforeAll(): Unit = { - super.beforeAll() - // Tests depend on intermediate results that would otherwise be cleaned up when - // shuffle clean up is enabled, causing test failures. - sqlConf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false) - } - - protected override def afterAll(): Unit = { - sqlConf.setConf(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, true) - super.afterAll() - } + override def sparkConf: SparkConf = super.sparkConf + .set(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false) private def runAdaptiveAndVerifyResult(query: String, skipCheckAnswer: Boolean = false): (SparkPlan, SparkPlan) = { From 7e7d2e04409562d5d5d10832e6704ce4bd9334b5 Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Tue, 25 Nov 2025 14:06:04 -0800 Subject: [PATCH 09/10] Address review comments Remove comment --- .../apache/spark/sql/internal/SQLConf.scala | 8 ++++++++ .../spark/sql/classic/SparkSession.scala | 3 +-- .../adaptive/AdaptiveQueryExecSuite.scala | 9 ++++----- .../hive/thriftserver/SparkSQLDriver.scala | 19 +++++++++++++++++-- 4 files changed, 30 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 8b50abbe4052c..2fc468ad84ec5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3822,6 +3822,14 @@ object SQLConf { .version("4.1.0") .fallbackConf(SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED) + val THRIFT_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED = + buildConf("spark.sql.thrift.shuffleDependency.fileCleanup.enabled") + .doc("When enabled, shuffle files will be cleaned up at the end of Thrift server " + + "SQL executions.") + .version("4.2.0") + .booleanConf + .createWithDefault(Utils.isTesting) + val SORT_MERGE_JOIN_EXEC_BUFFER_IN_MEMORY_THRESHOLD = buildConf("spark.sql.sortMergeJoinExec.buffer.in.memory.threshold") .internal() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala index b2a0fd0d69bc9..5811fe759d3e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala @@ -583,8 +583,7 @@ class SparkSession private( } queryPlan } - Dataset.ofRows(self, plan, tracker, - QueryExecution.determineShuffleCleanupMode(sessionState.conf)) + Dataset.ofRows(self, plan, tracker) } /** @inheritdoc */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 144a6f59f48c3..3e7d26f74bd46 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -23,7 +23,7 @@ import java.net.URI import org.apache.logging.log4j.Level import org.scalatest.PrivateMethodTester -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.SparkException import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} import org.apache.spark.shuffle.sort.SortShuffleManager @@ -69,9 +69,6 @@ class AdaptiveQueryExecSuite setupTestData() - override def sparkConf: SparkConf = super.sparkConf - .set(SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED, false) - private def runAdaptiveAndVerifyResult(query: String, skipCheckAnswer: Boolean = false): (SparkPlan, SparkPlan) = { var finalPlanCnt = 0 @@ -2039,7 +2036,9 @@ class AdaptiveQueryExecSuite } withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", - SQLConf.SHUFFLE_PARTITIONS.key -> "5") { + SQLConf.SHUFFLE_PARTITIONS.key -> "5", + // Disabling cleanup as the test assertions depend on them + SQLConf.CLASSIC_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED.key -> "false") { val df = sql( """ |SELECT * FROM ( diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index 02c47a88af074..f23cd3df1bbad 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -66,8 +66,23 @@ private[hive] class SparkSQLDriver(val sparkSession: SparkSession = SparkSQLEnv. new VariableSubstitution().substitute(command) } sparkSession.sparkContext.setJobDescription(substitutorCommand) - val execution = sparkSession.sql(command).queryExecution - // The SQL command has been executed above via `executePlan`, therefore we don't need to + + val logicalPlan = sparkSession.sessionState.sqlParser.parsePlan(substitutorCommand) + val conf = sparkSession.sessionState.conf + + val shuffleCleanupMode = + if (conf.getConf(SQLConf.THRIFT_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED)) { + org.apache.spark.sql.execution.RemoveShuffleFiles + } else { + org.apache.spark.sql.execution.DoNotCleanup + } + + val execution = new QueryExecution( + sparkSession.asInstanceOf[org.apache.spark.sql.classic.SparkSession], + logicalPlan, + shuffleCleanupMode = shuffleCleanupMode) + + // the above execution already has an execution ID, therefore we don't need to // wrap it again with a new execution ID when getting Hive result. execution.logical match { case _: CommandResult => From 64a3a9e0ed6a1f99463a52d80ebccfd6764a11bd Mon Sep 17 00:00:00 2001 From: Karuppayya Rajendran Date: Wed, 26 Nov 2025 08:13:16 -0800 Subject: [PATCH 10/10] Address review comment: Use thriftserver instead of thrift --- .../main/scala/org/apache/spark/sql/internal/SQLConf.scala | 4 ++-- .../apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2fc468ad84ec5..2f7706c859bae 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3822,8 +3822,8 @@ object SQLConf { .version("4.1.0") .fallbackConf(SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED) - val THRIFT_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED = - buildConf("spark.sql.thrift.shuffleDependency.fileCleanup.enabled") + val THRIFTSERVER_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED = + buildConf("spark.sql.thriftserver.shuffleDependency.fileCleanup.enabled") .doc("When enabled, shuffle files will be cleaned up at the end of Thrift server " + "SQL executions.") .version("4.2.0") diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index f23cd3df1bbad..7a220d516757a 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -71,7 +71,7 @@ private[hive] class SparkSQLDriver(val sparkSession: SparkSession = SparkSQLEnv. val conf = sparkSession.sessionState.conf val shuffleCleanupMode = - if (conf.getConf(SQLConf.THRIFT_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED)) { + if (conf.getConf(SQLConf.THRIFTSERVER_SHUFFLE_DEPENDENCY_FILE_CLEANUP_ENABLED)) { org.apache.spark.sql.execution.RemoveShuffleFiles } else { org.apache.spark.sql.execution.DoNotCleanup